authentic/tests/conftest.py

492 lines
15 KiB
Python

# -*- coding: utf-8 -*-
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import mock
import django_webtest
import django
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django_rbac.utils import get_ou_model, get_role_model
from django.utils.six.moves.urllib import parse as urlparse
from django.db.migrations.executor import MigrationExecutor
from django.db import connection
from django.core.management import call_command
from pytest_django.migrations import DisableMigrations
from authentic2 import hooks as a2_hooks
from authentic2.models import Service
from authentic2.utils.evaluate import BaseExpressionValidator
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.manager.utils import get_ou_count
from authentic2_auth_oidc.utils import get_providers
from authentic2_auth_oidc.utils import get_provider_by_issuer
from authentic2_auth_oidc.utils import has_providers
from authentic2_idp_oidc.models import OIDCClient
from authentic2.authentication import OIDCUser
from . import utils
def pytest_addoption(parser):
parser.addoption(
"--slow",
action="store_true",
help="run slow tests",
)
def pytest_configure(config):
# register an additional marker
config.addinivalue_line(
"markers", "slow: mark test as slow"
)
def pytest_runtest_setup(item):
markers = list(item.iter_markers(name='slow'))
slow = item.config.getoption('--slow')
if markers and not slow:
pytest.skip('slow tests must not run')
Role = get_role_model()
@pytest.fixture
def settings(settings, request):
# our post_migrate handlers depends upon some values of the settings (like
# A2_RBAC_MANAGED_CONTENT_TYPES), making the media fixture "autouse=True"
# fixed the order of running settings and transactional_db, but
# transactional_db use the flush() which use the post_migrate handlers to
# restore a blank database state. To force the ordering of transactional_db
# and settings fixture we need to override the later and use a dynamic call
# to the transactional_db fixture when needed.
if 'transactional_db' in request.fixturenames:
request.getfixturevalue('transactional_db')
yield settings
settings.finalize()
@pytest.fixture
def app_factory():
wtm = django_webtest.WebTestMixin()
wtm._patch_settings()
try:
def factory(hostname='testserver'):
return django_webtest.DjangoTestApp(extra_environ={'HTTP_HOST': hostname})
yield factory
finally:
wtm._unpatch_settings()
@pytest.fixture
def app(app_factory):
return app_factory()
@pytest.fixture
def ou1(db):
OU = get_ou_model()
return OU.objects.create(name='OU1', slug='ou1')
@pytest.fixture
def ou2(db):
OU = get_ou_model()
return OU.objects.create(name='OU2', slug='ou2')
@pytest.fixture
def ou_rando(db):
OU = get_ou_model()
return OU.objects.create(name='ou_rando', slug='ou_rando')
def create_user(**kwargs):
User = get_user_model()
password = kwargs.pop('password', None) or kwargs['username']
user, created = User.objects.get_or_create(**kwargs)
if password:
user.set_password(password)
user.save()
return user
@pytest.fixture
def simple_user(db, ou1):
return create_user(username='user', first_name=u'Jôhn', last_name=u'Dôe',
email='user@example.net', ou=get_default_ou())
@pytest.fixture
def superuser(db):
return create_user(username='superuser',
first_name='super', last_name='user',
email='superuser@example.net',
is_superuser=True,
is_staff=True,
is_active=True)
@pytest.fixture
def admin(db):
user = create_user(username='admin',
first_name='global', last_name='admin',
email='admin@example.net',
is_active=True,
ou=get_default_ou())
Role = get_role_model()
user.roles.add(Role.objects.get(slug='_a2-manager'))
return user
@pytest.fixture
def user_ou1(db, ou1):
return create_user(username='john.doe', first_name=u'Jôhn', last_name=u'Dôe',
email='john.doe@example.net', ou=ou1)
@pytest.fixture
def user_ou2(db, ou2):
return create_user(username='john.doe', first_name=u'Jôhn', last_name=u'Dôe',
email='john.doe@example.net', ou=ou2)
@pytest.fixture
def admin_ou1(db, ou1):
user = create_user(username='admin.ou1', first_name=u'Admin', last_name=u'OU1',
email='admin.ou1@example.net', ou=ou1)
user.roles.add(ou1.get_admin_role())
return user
@pytest.fixture
def admin_ou2(db, ou2):
user = create_user(username='admin.ou2', first_name=u'Admin', last_name=u'OU2',
email='admin.ou2@example.net', ou=ou2)
user.roles.add(ou2.get_admin_role())
return user
@pytest.fixture
def admin_rando_role(db, role_random, ou_rando):
user = create_user(username='admin_rando', first_name='admin', last_name='rando',
email='admin.rando@weird.com', ou=ou_rando)
user.roles.add(ou_rando.get_admin_role())
return user
@pytest.fixture(params=['superuser', 'user_ou1', 'user_ou2', 'admin_ou1', 'admin_ou2',
'admin_rando_role', 'member_rando'])
def user(request, superuser, user_ou1, user_ou2, admin_ou1, admin_ou2, admin_rando_role,
member_rando):
return locals().get(request.param)
@pytest.fixture
def logged_app(app, user):
utils.login(app, user)
return app
@pytest.fixture
def simple_role(db):
return Role.objects.create(name='simple role', slug='simple-role', ou=get_default_ou())
@pytest.fixture
def role_random(db, ou_rando):
return Role.objects.create(name='rando', slug='rando', ou=ou_rando)
@pytest.fixture
def role_ou1(db, ou1):
return Role.objects.create(name='role_ou1', slug='role_ou1', ou=ou1)
@pytest.fixture
def role_ou2(db, ou2):
return Role.objects.create(name='role_ou2', slug='role_ou2', ou=ou2)
@pytest.fixture(params=['role_random', 'role_ou1', 'role_ou2'])
def role(request, role_random, role_ou1, role_ou2):
return locals().get(request.param)
@pytest.fixture
def member_rando(db, ou_rando):
return create_user(username='test', first_name='test', last_name='test', email='test@test.org',
ou=ou_rando)
@pytest.fixture
def member_rando2(db, ou_rando):
return create_user(username='test2', first_name='test2', last_name='test2', email='test2@test.org',
ou=ou_rando)
@pytest.fixture
def member_fake():
return type('user', (object,), {'username': 'fake', 'uuid': 'fake_uuid'})
@pytest.fixture(params=['member_rando', 'member_fake'])
def member(request, member_rando, member_fake):
return locals().get(request.param)
@pytest.fixture(params=['superuser', 'admin'])
def superuser_or_admin(request, superuser, admin):
return locals().get(request.param)
@pytest.fixture
def concurrency(settings):
'''Select a level of concurrency based on the db. Currently only
postgresql is supported.
'''
return 100
@pytest.fixture
def migrations():
from django.conf import settings
if isinstance(settings.MIGRATION_MODULES, DisableMigrations):
pytest.skip('this test requires native migrations')
@pytest.fixture
def oidc_client(db, ou1):
client = OIDCClient.objects.create(
name='example', slug='example', client_id='example',
client_secret='example', authorization_flow=1,
post_logout_redirect_uris='https://example.net/redirect/',
identifier_policy=OIDCClient.POLICY_UUID,
has_api_access=True,
)
class TestOIDCUser(OIDCUser):
def __init__(self, oidc_client):
super(TestOIDCUser, self).__init__(oidc_client)
@property
def username(self):
return self.oidc_client.client_id
@property
def is_superuser(self):
return False
@property
def roles(self):
return mock.Mock(exists=lambda: True)
@property
def ou(self):
return ou1
return TestOIDCUser(client)
@pytest.fixture(params=['oidc_client', 'superuser', 'user_ou1', 'user_ou2',
'admin_ou1', 'admin_ou2', 'admin_rando_role', 'member_rando'])
def api_user(request, oidc_client, superuser, user_ou1, user_ou2,
admin_ou1, admin_ou2, admin_rando_role, member_rando):
return locals().get(request.param)
@pytest.fixture(autouse=True)
def clear_cache():
OU = get_ou_model()
cache.clear()
BaseExpressionValidator.__call__.cache_clear()
for cached_el in (OU.cached, a2_hooks.get_hooks, get_providers,
get_provider_by_issuer, get_ou_count, has_providers):
cached_el.cache.clear()
class AllHook(object):
def __init__(self):
self.calls = {}
def __call__(self, hook_name, *args, **kwargs):
calls = self.calls.setdefault(hook_name, [])
calls.append({'args': args, 'kwargs': kwargs})
def __getattr__(self, name):
return self.calls.get(name, [])
def clear(self):
self.calls = {}
@pytest.fixture
def hooks(settings):
if hasattr(settings, 'A2_HOOKS'):
hooks = settings.A2_HOOKS
else:
hooks = settings.A2_HOOKS = {}
hook = hooks['__all__'] = AllHook()
yield hook
hook.clear()
del settings.A2_HOOKS['__all__']
@pytest.fixture
def auto_admin_role(db, ou1):
role = Role.objects.create(
ou=ou1,
slug='auto-admin-role',
name='Auto Admin Role')
role.add_self_administration()
return role
@pytest.fixture
def user_with_auto_admin_role(auto_admin_role, ou1):
user = create_user(username='user.with.auto.admin.role', first_name=u'User', last_name=u'With Auto Admin Role',
email='user.with.auto.admin.role@example.net', ou=ou1)
user.roles.add(auto_admin_role)
return user
# fixtures to check proper validation of redirect_url
@pytest.fixture
def saml_external_redirect(db):
from authentic2.saml.models import LibertyProvider
next_url = 'https://saml.example.com/whatever/'
LibertyProvider.objects.create(
entity_id='https://saml.example.com/',
protocol_conformance=3,
metadata=utils.saml_sp_metadata('https://example.com'))
return next_url, True
@pytest.fixture
def invalid_external_redirect():
return 'https://invalid.example.com/whatever/', False
@pytest.fixture
def whitelist_external_redirect(settings):
settings.A2_REDIRECT_WHITELIST = ['https://whitelist.example.com/']
return 'https://whitelist.example.com/whatever/', True
@pytest.fixture(params=['saml', 'invalid', 'whitelist'])
def external_redirect(request, saml_external_redirect,
invalid_external_redirect, whitelist_external_redirect):
return locals()[request.param + '_external_redirect']
@pytest.fixture
def external_redirect_next_url(external_redirect):
return external_redirect[0]
@pytest.fixture
def assert_external_redirect(external_redirect):
next_url, valid = external_redirect
if valid:
def check_location(response, default_return):
assert next_url.endswith(response['Location'])
else:
def check_location(response, default_return):
assert urlparse.urljoin('http://testserver/', default_return)\
.endswith(response['Location'])
return check_location
@pytest.fixture
def french_translation():
from django.utils.translation import activate, deactivate
activate('fr')
yield
deactivate()
@pytest.fixture(autouse=True)
def media(settings, tmpdir):
settings.MEDIA_ROOT = str(tmpdir.mkdir('media'))
@pytest.fixture
def service(db):
return Service.objects.create(
ou=get_default_ou(),
slug='service',
name='Service')
@pytest.fixture()
def migration(request, transactional_db):
# see https://gist.github.com/asfaltboy/b3e6f9b5d95af8ba2cc46f2ba6eae5e2
if django.VERSION < (1, 9):
pytest.skip('migration fixture only works with Django 1.9')
"""
This fixture returns a helper object to test Django data migrations.
The fixture returns an object with two methods;
- `before` to initialize db to the state before the migration under test
- `after` to execute the migration and bring db to the state after the migration
The methods return `old_apps` and `new_apps` respectively; these can
be used to initiate the ORM models as in the migrations themselves.
For example:
def test_foo_set_to_bar(migration):
old_apps = migration.before([('my_app', '0001_inital')])
Foo = old_apps.get_model('my_app', 'foo')
Foo.objects.create(bar=False)
assert Foo.objects.count() == 1
assert Foo.objects.filter(bar=False).count() == Foo.objects.count()
# executing migration
new_apps = migration.apply([('my_app', '0002_set_foo_bar')])
Foo = new_apps.get_model('my_app', 'foo')
assert Foo.objects.filter(bar=False).count() == 0
assert Foo.objects.filter(bar=True).count() == Foo.objects.count()
Based on: https://gist.github.com/blueyed/4fb0a807104551f103e6
"""
class Migrator(object):
def before(self, targets, at_end=True):
""" Specify app and starting migration names as in:
before([('app', '0001_before')]) => app/migrations/0001_before.py
"""
executor = MigrationExecutor(connection)
executor.migrate(targets)
executor.loader.build_graph()
return executor._create_project_state(with_applied_migrations=True).apps
def apply(self, targets):
""" Migrate forwards to the "targets" migration """
executor = MigrationExecutor(connection)
executor.migrate(targets)
executor.loader.build_graph()
return executor._create_project_state(with_applied_migrations=True).apps
yield Migrator()
call_command('migrate', verbosity=0)