update and cache metadata from URL and path (#10196)

This commit is contained in:
Benjamin Dauvergne 2019-06-07 21:46:07 +02:00
parent 83a09d874e
commit 39e2e7e5ac
11 changed files with 380 additions and 205 deletions

34
README
View File

@ -76,11 +76,23 @@ overridden in the identity provider settings by removing the
MELLON_IDENTITY_PROVIDERS
-------------------------
A list of dictionaries, only one key is mandatory in those
dictionaries `METADATA` it should contain the UTF-8 content of the
metadata file of the identity provider or if it starts with a slash
the absolute path toward a metadata file. All other keys are override
of generic settings.
A list of dictionaries, they must contain at least one of the keys `METADATA`
(inline copy of the identity provider metadata), `METADATA_URL` URL of the IdP
metadata file, or `METADATA_PATH` an absolute path to the IdP metadata file..
All other keys are override of generic settings.
When using an URL, the URL is automatically cached in the `MEDIA_ROOT`
directory of your application in the directory named `mellon_metadata_cache`.
If you restart the application and the URL is unavailable, the file cache will
be used. The cache will be refreshed every `MELLON_METADATA_CACHE_TIME` seconds.
If the HTTP retrieval of the metadata URL takes longer thant
`METTON_METADATA_HTTP_TIMEOUT` seconds, retrieval will be skipped.
When the cache is already loaded, retrievals are done in the background by a
thread.
When using a local absolute path, the metadata is reloaded each time the
modification time of the file is superior to the last time it was loaded.
MELLON_PUBLIC_KEYS
------------------
@ -287,6 +299,18 @@ The targeted user(s) field(s) should be as much as possible unique
individually, if not django-mellon will refuse to link multiple users matching
the rules.
MELLON_METADATA_CACHE_TIME
--------------------------
When using METADATA_URL to reference a metadata file, it's the duration in
secondes between refresh of the metadata file. Default is 3600 seconds, 1 hour.
METTON_METADATA_HTTP_TIMEOUT
---------------------------
Timeout in seconds for HTTP call made to retrieve metadata files. Default is 10
seconds.
Tests
=====

6
debian/control vendored
View File

@ -15,7 +15,8 @@ Depends: ${misc:Depends}, ${python:Depends},
python (>= 2.7),
python-django (>= 1.5),
python-isodate,
python-lasso
python-lasso,
python-atomicwrites
Breaks: python-hobo (<< 0.34.5)
Description: SAML authentication for Django
@ -24,5 +25,6 @@ Architecture: all
Depends: ${misc:Depends}, ${python:Depends},
python3-django (>= 1.5),
python3-isodate,
python3-lasso
python3-lasso,
python3-atomicwrites
Description: SAML authentication for Django

View File

@ -13,24 +13,33 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import uuid
from xml.etree import ElementTree as ET
import hashlib
import logging
import os
import threading
import time
import uuid
import lasso
import requests
import requests.exceptions
from atomicwrites import atomic_write
from django.core.exceptions import PermissionDenied, FieldDoesNotExist
from django.core.files.storage import default_storage
from django.contrib import auth
from django.contrib.auth.models import Group
from django.utils import six
from django.utils.encoding import force_text
from django.utils.six.moves.urllib.parse import urlparse
from . import utils, app_settings, models
User = auth.get_user_model()
logger = logging.getLogger(__name__)
class UserCreationError(Exception):
pass
@ -46,9 +55,6 @@ def display_truncated_list(l, max_length=10):
class DefaultAdapter(object):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
def get_idp(self, entity_id):
'''Find the first IdP definition matching entity_id'''
for idp in self.get_idps():
@ -63,40 +69,181 @@ class DefaultAdapter(object):
def get_idps(self):
for i, idp in enumerate(self.get_identity_providers_setting()):
if 'METADATA_URL' in idp and 'METADATA' not in idp:
if self.load_idp(idp, i):
yield idp
def load_metadata_path(self, idp, i):
path = idp['METADATA_PATH']
if not os.path.exists(path):
logger.warning('metadata path %s does not exist', path)
return
last_update = idp.get('METADATA_PATH_LAST_UPDATE', 0)
try:
mtime = os.stat(path).st_mtime
except OSError as e:
logger.warning('metadata path %s : stat() call failed, %s', path, e)
return
if last_update == 0 or mtime >= last_update:
idp['METADATA_PATH_LAST_UPDATE'] = time.time()
try:
with open(path) as fd:
metadata = fd.read()
except OSError as e:
logger.warning('metadata path %s : open()/read() call failed, %s', path, e)
return
entity_id = self.load_entity_id(metadata, i)
if not entity_id:
logger.error('invalid metadata file retrieved from %s', path)
return
if 'ENTITY_ID' in idp and idp['ENTITY_ID'] != entity_id:
logger.error('metadata path %s : entityID changed %r != %r', path, entity_id, idp['ENTITY_ID'])
del idp['ENTITY_ID']
idp['METADATA'] = metadata
def load_metadata_url(self, idp, i):
url = idp['METADATA_URL']
metadata_cache_time = utils.get_setting(idp, 'METADATA_CACHE_TIME')
timeout = utils.get_setting(idp, 'METADATA_HTTP_TIMEOUT')
warning = logger.warning
if 'METADATA' not in idp:
# if we have no metadata in cache, we must emit errors
warning = logger.error
try:
hostname = urlparse(url).hostname
except (ValueError, TypeError) as e:
warning('invalid METADATA_URL %r: %s', url, e)
return
if not hostname:
warning('no hostname in METADATA_URL %r: %s', url)
return
last_update = idp.get('METADATA_URL_LAST_UPDATE', 0)
now = time.time()
try:
url_fingerprint = hashlib.md5(url.encode('ascii')).hexdigest()
file_cache_key = '%s_%s.xml' % (hostname, url_fingerprint)
except (UnicodeError, TypeError, ValueError):
warning('unable to compute file_cache_key')
return
cache_directory = default_storage.path('mellon_metadata_cache')
file_cache_path = os.path.join(cache_directory, file_cache_key)
if metadata_cache_time:
# METADATA_CACHE_TIME == 0 disable the file cache
if not os.path.exists(cache_directory):
os.makedirs(cache_directory)
if os.path.exists(file_cache_path) and 'METADATA' not in idp:
try:
with open(file_cache_path) as fd:
idp['METADATA'] = fd.read()
# use file cache mtime as last_update time, prevent too many loading from different workers
last_update = max(last_update, os.stat(file_cache_path).st_mtime)
except OSError:
warning('metadata url %s : error when loading the file cache %s', url, file_cache_path)
# fresh cache, skip loading
if last_update and 'METADATA' in idp and (now - last_update) < metadata_cache_time:
return
def __http_get():
try:
verify_ssl_certificate = utils.get_setting(
idp, 'VERIFY_SSL_CERTIFICATE')
try:
response = requests.get(idp['METADATA_URL'], verify=verify_ssl_certificate)
response = requests.get(url, verify=verify_ssl_certificate, timeout=timeout)
response.raise_for_status()
except requests.exceptions.RequestException as e:
self.logger.error(
u'retrieval of metadata URL %r failed with error %s for %d-th idp',
idp['METADATA_URL'], e, i)
continue
idp['METADATA'] = response.text
elif 'METADATA' in idp:
if idp['METADATA'].startswith('/'):
idp['METADATA'] = open(idp['METADATA']).read()
else:
self.logger.error(u'missing METADATA or METADATA_URL in %d-th idp', i)
continue
if 'ENTITY_ID' not in idp:
try:
doc = ET.fromstring(idp['METADATA'])
except (TypeError, ET.ParseError):
self.logger.error(u'METADATA of %d-th idp is invalid', i)
continue
if doc.tag != '{%s}EntityDescriptor' % lasso.SAML2_METADATA_HREF:
self.logger.error(u'METADATA of %d-th idp has no EntityDescriptor root tag', i)
continue
warning('metadata url %s : HTTP request failed %s', url, e)
return
if 'entityID' not in doc.attrib:
self.logger.error(
u'METADATA of %d-th idp has no entityID attribute on its root tag', i)
continue
idp['ENTITY_ID'] = doc.attrib['entityID']
yield idp
entity_id = self.load_entity_id(response.text, i)
if not entity_id:
warning('invalid metadata file retrieved from %s', url)
return
if 'ENTITY_ID' in idp and idp['ENTITY_ID'] != entity_id:
# entityID change is always en error
logger.error('metadata url %s : entityID changed %r != %r', url, entity_id, idp['ENTITY_ID'])
del idp['ENTITY_ID']
idp['METADATA'] = response.text
idp['METADATA_URL_LAST_UPDATE'] = now
if metadata_cache_time:
try:
with atomic_write(file_cache_path, mode='wb', overwrite=True) as fd:
fd.write(response.text.encode('utf-8'))
except OSError as e:
logger.error('metadata url %s : could not write file cache %s, %s', url, file_cache_path, e)
idp['METADATA_PATH'] = file_cache_path
# prevent reloading of the file cache immediately
idp['METADATA_PATH_LAST_UPDATE'] = time.time() + 1
logger.debug('metadata url %s : update throught HTTP', url)
finally:
# release thread object
idp.pop('METADATA_URL_UPDATE_THREAD', None)
# emit an error if cache is too old
stale_timeout = 24 * metadata_cache_time
if last_update and (now - idp['METADATA_URL_LAST_UPDATE']) > stale_timeout:
logger.error('metadata url %s : not updated since %.1f hours',
stale_timeout / 3600.0)
# we have cache, update in background
if last_update and 'METADATA' in idp:
t = threading.Thread(target=__http_get)
t.start()
# store thread in idp for tests
idp['METADATA_URL_UPDATE_THREAD'] = t
# suspend updates for HTTP timeout + 5 seconds
idp['METADATA_URL_LAST_UPDATE'] = last_update + timeout + 5
else:
# synchronous update
__http_get()
def load_metadata(self, idp, i):
# legacy support
if 'METADATA' in idp and idp['METADATA'].startswith('/'):
idp['METADATA_PATH'] = idp['METADATA']
del idp['METADATA']
if 'METADATA_PATH' in idp:
self.load_metadata_path(idp, i)
if 'METADATA_URL' in idp:
self.load_metadata_url(idp, i)
if 'METADATA' in idp:
if 'ENTITY_ID' not in idp:
entity_id = self.load_entity_id(idp['METADATA'], i)
if entity_id:
idp['ENTITY_ID'] = entity_id
if 'ENTITY_ID' in idp:
return idp['METADATA']
def load_entity_id(self, metadata, i):
try:
doc = ET.fromstring(metadata)
except (TypeError, ET.ParseError):
logger.error(u'METADATA of %d-th idp is invalid', i)
return None
if doc.tag != '{%s}EntityDescriptor' % lasso.SAML2_METADATA_HREF:
logger.error(u'METADATA of %d-th idp has no EntityDescriptor root tag', i)
return None
if 'entityID' not in doc.attrib:
logger.error(
u'METADATA of %d-th idp has no entityID attribute on its root tag', i)
return None
return doc.attrib['entityID']
def load_idp(self, idp, i):
self.load_metadata(idp, i)
return 'ENTITY_ID' in idp
def authorize(self, idp, saml_attributes):
if not idp:
@ -116,12 +263,12 @@ class DefaultAdapter(object):
username = force_text(username_template).format(
realm=realm, attributes=saml_attributes, idp=idp)[:30]
except ValueError:
self.logger.error(u'invalid username template %r', username_template)
logger.error(u'invalid username template %r', username_template)
except (AttributeError, KeyError, IndexError) as e:
self.logger.error(
logger.error(
u'invalid reference in username template %r: %s', username_template, e)
except Exception:
self.logger.exception(u'unknown error when formatting username')
logger.exception(u'unknown error when formatting username')
else:
return username
@ -131,7 +278,7 @@ class DefaultAdapter(object):
def finish_create_user(self, idp, saml_attributes, user):
username = self.format_username(idp, saml_attributes)
if not username:
self.logger.warning('could not build a username, login refused')
logger.warning('could not build a username, login refused')
raise UserCreationError
user.username = username
user.save()
@ -146,8 +293,8 @@ class DefaultAdapter(object):
if len(name_id) == 1:
name_id = name_id[0]
else:
self.logger.warning('more than one value for attribute %r, cannot federate',
transient_federation_attribute)
logger.warning('more than one value for attribute %r, cannot federate',
transient_federation_attribute)
return None
else:
return None
@ -158,8 +305,7 @@ class DefaultAdapter(object):
user = self.get_users_queryset(idp, saml_attributes).get(
saml_identifiers__name_id=name_id,
saml_identifiers__issuer=issuer)
self.logger.info('looked up user %s with name_id %s from issuer %s',
user, name_id, issuer)
logger.info('looked up user %s with name_id %s from issuer %s', user, name_id, issuer)
return user
except User.DoesNotExist:
pass
@ -172,15 +318,14 @@ class DefaultAdapter(object):
created = False
if not user:
if not utils.get_setting(idp, 'PROVISION'):
self.logger.debug('provisionning disabled, login refused')
logger.debug('provisionning disabled, login refused')
return None
created = True
user = self.create_user(User)
nameid_user = self._link_user(idp, saml_attributes, issuer, name_id, user)
if user != nameid_user:
self.logger.info('looked up user %s with name_id %s from issuer %s',
nameid_user, name_id, issuer)
logger.info('looked up user %s with name_id %s from issuer %s', nameid_user, name_id, issuer)
if created:
user.delete()
return nameid_user
@ -191,38 +336,37 @@ class DefaultAdapter(object):
except UserCreationError:
user.delete()
return None
self.logger.info('created new user %s with name_id %s from issuer %s',
nameid_user, name_id, issuer)
logger.info('created new user %s with name_id %s from issuer %s', nameid_user, name_id, issuer)
return nameid_user
def _lookup_by_attributes(self, idp, saml_attributes, lookup_by_attributes):
if not isinstance(lookup_by_attributes, list):
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes)
logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list', lookup_by_attributes)
return None
users = set()
for line in lookup_by_attributes:
if not isinstance(line, dict):
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line)
logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: it must be a list of dicts', line)
continue
user_field = line.get('user_field')
if not hasattr(user_field, 'isalpha'):
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: user_field is missing', line)
logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: user_field is missing', line)
continue
try:
User._meta.get_field(user_field)
except FieldDoesNotExist:
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist',
line, user_field)
logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r, user field %s does not exist',
line, user_field)
continue
saml_attribute = line.get('saml_attribute')
if not hasattr(saml_attribute, 'isalpha'):
self.logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: saml_attribute is missing', line)
logger.error('invalid LOOKUP_BY_ATTRIBUTES configuration %r: saml_attribute is missing', line)
continue
values = saml_attributes.get(saml_attribute)
if not values:
self.logger.error('looking for user by saml attribute %r and user field %r, skipping because empty',
saml_attribute, user_field)
logger.error('looking for user by saml attribute %r and user field %r, skipping because empty',
saml_attribute, user_field)
continue
ignore_case = line.get('ignore-case', False)
for value in values:
@ -232,20 +376,19 @@ class DefaultAdapter(object):
users_found = self.get_users_queryset(idp, saml_attributes).filter(
saml_identifiers__isnull=True, **{key: value})
if not users_found:
self.logger.debug('looking for users by attribute %r and user field %r with value %r: not found',
saml_attribute, user_field, value)
logger.debug('looking for users by attribute %r and user field %r with value %r: not found',
saml_attribute, user_field, value)
continue
self.logger.info(u'looking for user by attribute %r and user field %r with value %r: found %s',
saml_attribute, user_field, value, display_truncated_list(users_found))
logger.info(u'looking for user by attribute %r and user field %r with value %r: found %s',
saml_attribute, user_field, value, display_truncated_list(users_found))
users.update(users_found)
if len(users) == 1:
user = list(users)[0]
self.logger.info(u'looking for user by attributes %r: found user %s',
lookup_by_attributes, user)
logger.info(u'looking for user by attributes %r: found user %s', lookup_by_attributes, user)
return user
elif len(users) > 1:
self.logger.warning(u'looking for user by attributes %r: too many users found(%d), failing',
lookup_by_attributes, len(users))
logger.warning(u'looking for user by attributes %r: too many users found(%d), failing',
lookup_by_attributes, len(users))
return None
def _link_user(self, idp, saml_attributes, issuer, name_id, user):
@ -269,9 +412,9 @@ class DefaultAdapter(object):
try:
value = force_text(tpl).format(realm=realm, attributes=saml_attributes, idp=idp)
except ValueError:
self.logger.warning(u'invalid attribute mapping template %r', tpl)
logger.warning(u'invalid attribute mapping template %r', tpl)
except (AttributeError, KeyError, IndexError, ValueError) as e:
self.logger.warning(
logger.warning(
u'invalid reference in attribute mapping template %r: %s', tpl, e)
else:
model_field = user._meta.get_field(field)
@ -281,8 +424,7 @@ class DefaultAdapter(object):
old_value = getattr(user, field)
setattr(user, field, value)
attribute_set = True
self.logger.info(u'set field %s of user %s to value %r (old value %r)', field,
user, value, old_value)
logger.info(u'set field %s of user %s to value %r (old value %r)', field, user, value, old_value)
if attribute_set:
user.save()
@ -305,13 +447,13 @@ class DefaultAdapter(object):
user.is_staff = True
user.is_superuser = True
attribute_set = True
self.logger.info('flag is_staff and is_superuser added to user %s', user)
logger.info('flag is_staff and is_superuser added to user %s', user)
break
else:
if user.is_superuser or user.is_staff:
user.is_staff = False
user.is_superuser = False
self.logger.info('flag is_staff and is_superuser removed from user %s', user)
logger.info('flag is_staff and is_superuser removed from user %s', user)
attribute_set = True
if attribute_set:
user.save()
@ -334,12 +476,11 @@ class DefaultAdapter(object):
continue
groups.append(group)
for group in Group.objects.filter(pk__in=[g.pk for g in groups]).exclude(user=user):
self.logger.info(
logger.info(
u'adding group %s (%s) to user %s (%s)', group, group.pk, user, user.pk)
User.groups.through.objects.get_or_create(group=group, user=user)
qs = User.groups.through.objects.exclude(
group__pk__in=[g.pk for g in groups]).filter(user=user)
for rel in qs:
self.logger.info(u'removing group %s (%s) from user %s (%s)', rel.group,
rel.group.pk, rel.user, rel.user.pk)
logger.info(u'removing group %s (%s) from user %s (%s)', rel.group, rel.group.pk, rel.user, rel.user.pk)
qs.delete()

View File

@ -41,6 +41,8 @@ class AppSettings(object):
'LOGIN_HINTS': [],
'SIGNATURE_METHOD': 'RSA-SHA256',
'LOOKUP_BY_ATTRIBUTES': [],
'METADATA_CACHE_TIME': 3600,
'METADATA_HTTP_TIMEOUT': 10,
}
@property

View File

@ -95,12 +95,11 @@ def create_server(request):
key = key[0]
server.setEncryptionPrivateKeyWithPassword(key, password)
for idp in get_idps():
try:
server.addProviderFromBuffer(lasso.PROVIDER_ROLE_IDP, idp['METADATA'])
except lasso.Error as e:
logger.error(u'bad metadata in idp %r', idp['ENTITY_ID'])
logger.debug(u'lasso error: %s', e)
continue
if idp and idp.get('METADATA'):
try:
server.addProviderFromBuffer(lasso.PROVIDER_ROLE_IDP, idp['METADATA'])
except lasso.Error as e:
logger.error(u'bad metadata in idp %s, %s', idp['ENTITY_ID'], e)
cache[root] = server
settings._MELLON_SERVER_CACHE = cache
return settings._MELLON_SERVER_CACHE.get(root)

View File

@ -169,6 +169,10 @@ class LoginView(ProfileMixin, LogMixin, View):
'''show error message to user after a login failure'''
login = self.profile
idp = utils.get_idp(login.remoteProviderId)
if not idp:
self.log.warning('entity id %r is unknown', login.remoteProviderId)
return HttpResponseBadRequest(
'entity id %r is unknown' % login.remoteProviderId)
error_url = utils.get_setting(idp, 'ERROR_URL')
error_redirect_after_timeout = utils.get_setting(idp, 'ERROR_REDIRECT_AFTER_TIMEOUT')
if error_url:
@ -391,7 +395,7 @@ class LoginView(ProfileMixin, LogMixin, View):
next_url = check_next_url(self.request, request.GET.get(REDIRECT_FIELD_NAME))
idp = self.get_idp(request)
if idp is None:
if not idp:
return HttpResponseBadRequest('no idp found')
self.profile = login = utils.create_login(request)
self.log.debug('authenticating to %r', idp['ENTITY_ID'])

View File

@ -94,6 +94,7 @@ setup(name="django-mellon",
'django>=1.5,<2.0',
'requests',
'isodate',
'atomicwrites',
],
setup_requires=[
'django>=1.5,<2.0',

View File

@ -13,13 +13,21 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
import pytest
import django_webtest
@pytest.fixture(autouse=True)
def settings(settings, tmpdir):
settings.MEDIA_ROOT = str(tmpdir.mkdir('media'))
return settings
@pytest.fixture
def app(request):
def app(request, settings):
wtm = django_webtest.WebTestMixin()
wtm._patch_settings()
request.addfinalizer(wtm._unpatch_settings)
@ -38,7 +46,7 @@ def concurrency(settings):
@pytest.fixture
def private_settings(request):
def private_settings(request, tmpdir):
import django.conf
from django.conf import UserSettingsHolder
old = django.conf.settings._wrapped
@ -57,3 +65,17 @@ def caplog(caplog):
caplog.handler.stream = py.io.TextIO()
caplog.handler.records = []
return caplog
@pytest.fixture(scope='session')
def metadata():
with open(os.path.join(os.path.dirname(__file__), 'metadata.xml')) as fd:
yield fd.read()
@pytest.fixture
def metadata_path(tmpdir, metadata):
metadata_path = tmpdir / 'metadata.xml'
with metadata_path.open('w') as fd:
fd.write(metadata)
yield str(metadata_path)

View File

@ -13,11 +13,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import datetime
import re
import lasso
import time
from multiprocessing.pool import ThreadPool
import pytest
from django.contrib import auth
from django.db import connection
@ -167,7 +170,7 @@ def test_provision_is_superuser(settings, django_user_model, idp, saml_attribute
user = SAMLBackend().authenticate(saml_attributes=saml_attributes)
assert user.is_superuser is True
assert user.is_staff is True
assert not 'flag is_staff and is_superuser removed' in caplog.text
assert 'flag is_staff and is_superuser removed' not in caplog.text
def test_provision_absent_attribute(settings, django_user_model, idp, saml_attributes, caplog):
@ -326,3 +329,92 @@ def test_lookup_user_by_attributes_ignore_case(settings, idp, saml_attributes, j
{'user_field': 'username', 'saml_attribute': 'saml_at1', 'ignore-case': True},
]
assert adapter.lookup_user(idp, saml_attributes) == jane
@pytest.fixture
def adapter():
return DefaultAdapter()
def test_load_metadata_simple(adapter, metadata):
idp = {'METADATA': metadata}
assert adapter.load_metadata(idp, 0) == metadata
def test_load_metadata_legacy(adapter, metadata_path, metadata):
idp = {'METADATA': metadata_path}
assert adapter.load_metadata(idp, 0) == metadata
assert idp['METADATA'] == metadata
def test_load_metadata_path(adapter, metadata_path, metadata, freezer):
now = time.time()
idp = {'METADATA_PATH': str(metadata_path)}
assert adapter.load_metadata(idp, 0) == metadata
assert idp['METADATA'] == metadata
assert idp['METADATA_PATH_LAST_UPDATE'] == now
def test_load_metadata_url(settings, adapter, metadata, httpserver, freezer, caplog):
now = time.time()
httpserver.serve_content(content=metadata, headers={'Content-Type': 'application/xml'})
idp = {'METADATA_URL': httpserver.url}
assert adapter.load_metadata(idp, 0) == metadata
assert idp['METADATA'] == metadata
assert idp['METADATA_URL_LAST_UPDATE'] == now
assert 'METADATA_PATH' in idp
assert idp['METADATA_PATH'].startswith(settings.MEDIA_ROOT)
with open(idp['METADATA_PATH']) as fd:
assert fd.read() == metadata
assert idp['METADATA_PATH_LAST_UPDATE'] == now + 1
httpserver.serve_content(content=metadata.replace('idp5', 'idp6'),
headers={'Content-Type': 'application/xml'})
assert adapter.load_metadata(idp, 0) == metadata
freezer.move_to(datetime.timedelta(seconds=3601))
caplog.clear()
assert adapter.load_metadata(idp, 0) == metadata
# wait for update thread to finish
try:
idp['METADATA_URL_UPDATE_THREAD'].join()
except KeyError:
pass
new_meta = adapter.load_metadata(idp, 0)
assert new_meta != metadata
assert new_meta == metadata.replace('idp5', 'idp6')
assert 'entityID changed' in caplog.records[-1].message
assert caplog.records[-1].levelname == 'ERROR'
# test load from file cache
del idp['METADATA']
del idp['METADATA_PATH']
del idp['METADATA_PATH_LAST_UPDATE']
httpserver.serve_content(content='', headers={'Content-Type': 'application/xml'})
assert adapter.load_metadata(idp, 0) == metadata.replace('idp5', 'idp6')
def test_load_metadata_url_stale_timeout(settings, adapter, metadata, httpserver, freezer, caplog):
httpserver.serve_content(content=metadata, headers={'Content-Type': 'application/xml'})
idp = {'METADATA_URL': httpserver.url}
assert adapter.load_metadata(idp, 0) == metadata
httpserver.serve_content(content='', headers={'Content-Type': 'application/xml'})
assert adapter.load_metadata(idp, 0) == metadata
freezer.move_to(datetime.timedelta(seconds=24 * 3600 - 1))
assert adapter.load_metadata(idp, 0) == metadata
# wait for update thread to finish
try:
idp['METADATA_URL_UPDATE_THREAD'].join()
except KeyError:
pass
assert caplog.records[-1].levelname == 'WARNING'
freezer.move_to(datetime.timedelta(seconds=3601))
assert adapter.load_metadata(idp, 0) == metadata
# wait for update thread to finish
try:
idp['METADATA_URL_UPDATE_THREAD'].join()
except KeyError:
pass
assert caplog.records[-1].levelname == 'ERROR'

View File

@ -13,128 +13,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import datetime
import mock
import lasso
import requests.exceptions
from httmock import HTTMock
from mellon.utils import create_server, create_metadata, iso8601_to_datetime, flatten_datetime
import mellon.utils
from mellon.utils import create_metadata, iso8601_to_datetime, flatten_datetime
from xml_utils import assert_xml_constraints
from utils import error_500, metadata_response
def test_create_server_connection_error(mocker, rf, private_settings, caplog):
mocker.patch('requests.get',
side_effect=requests.exceptions.ConnectionError('connection error'))
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA_URL': 'http://example.com/metadata',
}
]
request = rf.get('/')
create_server(request)
assert 'connection error' in caplog.text
def test_create_server_internal_server_error(mocker, rf, private_settings, caplog):
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA_URL': 'http://example.com/metadata',
}
]
request = rf.get('/')
assert not 'failed with error' in caplog.text
with HTTMock(error_500):
create_server(request)
assert 'failed with error' in caplog.text
def test_create_server_invalid_metadata(mocker, rf, private_settings, caplog):
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': 'xxx',
}
]
request = rf.get('/')
assert not 'failed with error' in caplog.text
with HTTMock(error_500):
create_server(request)
assert len(caplog.records) == 1
assert re.search('METADATA.*is invalid', caplog.text)
def test_create_server_invalid_metadata_file(mocker, rf, private_settings, caplog):
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': '/xxx',
}
]
request = rf.get('/')
assert not 'failed with error' in caplog.text
with mock.patch('mellon.adapters.open', mock.mock_open(read_data='yyy'), create=True):
with HTTMock(error_500):
server = create_server(request)
assert len(server.providers) == 0
def test_create_server_good_metadata_file(mocker, rf, private_settings, caplog):
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': '/xxx',
}
]
request = rf.get('/')
with mock.patch(
'mellon.adapters.open', mock.mock_open(read_data=open('tests/metadata.xml').read()),
create=True):
server = create_server(request)
assert 'ERROR' not in caplog.text
assert len(server.providers) == 1
def test_create_server_good_metadata(mocker, rf, private_settings, caplog):
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA': open('tests/metadata.xml').read(),
}
]
request = rf.get('/')
assert not 'failed with error' in caplog.text
server = create_server(request)
assert 'ERROR' not in caplog.text
assert len(server.providers) == 1
def test_create_server_invalid_idp_dict(mocker, rf, private_settings, caplog):
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
}
]
request = rf.get('/')
assert not 'failed with error' in caplog.text
create_server(request)
assert 'missing METADATA' in caplog.text
def test_create_server_good_metadata_url(mocker, rf, private_settings, caplog):
private_settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA_URL': 'http://example.com/metadata',
}
]
request = rf.get('/')
assert not 'failed with error' in caplog.text
with HTTMock(metadata_response):
server = create_server(request)
assert 'ERROR' not in caplog.text
assert len(server.providers) == 1
def test_create_metadata(rf, private_settings, caplog):
ns = {

View File

@ -1,5 +1,5 @@
[tox]
envlist = {coverage-,}py2-{dj18,dj111}-{pg,sqlite},py3-dj111-{pg,sqlite}
envlist = coverage-py2-{dj18,dj111}-{pg,sqlite},coverage-py3-dj111-{pg,sqlite}
toxworkdir = {env:TMPDIR:/tmp}/tox-{env:USER}/django-mellon/
[testenv]
@ -24,6 +24,8 @@ deps =
pytest-random
pytest-mock
pytest-django
pytest-freezegun
pytest-localserver
pytz
lxml
cssselect