# -*- coding: utf-8 -*- import os import tempfile import pytest import shutil import json import mock from requests import RequestException from django.core.management import call_command from django.db import connection from hobo.multitenant.middleware import TenantMiddleware from authentic2.data_transfer import export_site os.sys.path.append('%s/tests' % os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils import byteify pytestmark = pytest.mark.django_db @pytest.fixture def skeleton_dir(request, settings): settings.HOBO_SKELETONS_DIR = tempfile.mkdtemp('skeletons') def fin(): shutil.rmtree(settings.HOBO_SKELETONS_DIR) request.addfinalizer(fin) return settings.HOBO_SKELETONS_DIR def test_hobo_deploy(monkeypatch, tenant_base, mocker, skeleton_dir): from django.core.management import call_command from django.conf import settings from hobo.agent.authentic2.management.commands.hobo_deploy import Command as HoboDeployCommand # Create skeleton roles.json os.makedirs(os.path.join(skeleton_dir, 'commune', 'wcs')) with open(os.path.join(skeleton_dir, 'commune', 'wcs', 'roles.json'), 'w') as roles_json: json.dump([ { 'model': 'a2_rbac.role', 'fields': { 'name': u'Service petite enfance', 'slug': u'service-petite-enfance', }, }, { 'model': 'a2_rbac.role', 'fields': { 'name': u'Service état-civil', 'slug': u'service-etat-civil', }, }, ], roles_json) # As a user is created, notify_agents is called, as celery is not running # we just block it mocker.patch('hobo.agent.authentic2.provisionning.notify_agents') requests_get = mocker.patch('requests.get') meta1 = ''' nJpkBznHNbvE+RAC6mU+NPQnIWs8gFNCm6I3FPcUKYpaJbXaurJ4cJgvnaEiqIXPQDcbHxuLeCbYbId9yascWZirvQbh8d/r+Vv+24bPG++9gW+i3Nnz1VW8V+z0b+puHWvM/FjJjBNJgWkI38gaupz47U6/02CtWx00stitiwk= AQAB 3BxSiAzGvY1Yuqa31L7Zr2WHM/8cn5oX+Q6A2SYgzjuvAgnWyizN8YgW/fHR4G7MtkmZ5RFJLXfcSLwbUfpFHV6KO1ikbgViYuFempM+SWtjqEI7ribm9GaI5kUzHJZBrH3/Q9XAd9/GLLALxurGjbKDeLfc0D+7el26g4sYmA8= AQAB ''' meta2 = meta1.replace('eservices', 'passerelle') meta3 = meta1.replace('eservices', 'clapiers') metadatas = [meta1, meta2, meta3] monkeypatch.setattr(HoboDeployCommand, 'backoff_factor', 0.0001) side_effect_iter = iter([meta1, meta2, RequestException(), meta3]) def side_effect(*args, **kwargs): v = next(side_effect_iter) if isinstance(v, Exception): raise v m = mock.Mock() m.text = v return m requests_get.side_effect = side_effect env = { 'users': [ { 'username': 'john.doe', 'first_name': 'John', 'last_name': 'Doe', 'email': 'john.doe@example.net', 'password': 'password', }, ], 'profile': { 'fields': [ { 'kind': 'title', 'description': '', 'required': False, 'user_visible': True, 'label': u'Civilité', 'disabled': False, 'user_editable': True, 'asked_on_registration': False, 'name': 'title' }, { 'kind': 'string', 'description': '', 'required': True, 'user_visible': True, 'label': u'Prénom', 'disabled': False, 'user_editable': True, 'asked_on_registration': True, 'name': 'first_name' }, { 'kind': 'string', 'description': '', 'required': True, 'user_visible': True, 'label': 'Nom', 'disabled': False, 'user_editable': True, 'asked_on_registration': True, 'name': 'last_name' }, { 'kind': 'email', 'description': '', 'required': True, 'user_visible': True, 'label': u'Adresse électronique', 'disabled': False, 'user_editable': True, 'asked_on_registration': False, 'name': 'email' }, { 'kind': 'string', 'description': '', 'required': False, 'user_visible': True, 'label': 'Addresse', 'disabled': False, 'user_editable': True, 'asked_on_registration': False, 'name': 'address' }, { 'kind': 'string', 'description': '', 'required': False, 'user_visible': True, 'label': 'Code postal', 'disabled': False, 'user_editable': True, 'asked_on_registration': False, 'name': 'zipcode' }, { 'kind': 'string', 'description': '', 'required': False, 'user_visible': True, 'label': 'Commune', 'disabled': False, 'user_editable': True, 'asked_on_registration': False, 'name': 'city' }, { 'kind': 'string', 'description': '', 'required': False, 'user_visible': True, 'label': u'Téléphone', 'disabled': False, 'user_editable': True, 'asked_on_registration': False, 'name': 'phone' }, { 'kind': 'string', 'description': '', 'required': False, 'user_visible': True, 'label': 'Mobile', 'disabled': False, 'user_editable': True, 'asked_on_registration': False, 'name': 'mobile' }, { 'kind': 'string', 'description': '', 'required': False, 'user_visible': True, 'label': 'Pays', 'disabled': True, 'user_editable': True, 'asked_on_registration': False, 'name': 'country' }, { 'kind': 'birthdate', 'description': '', 'required': False, 'user_visible': True, 'label': 'Date de naissance', 'disabled': True, 'user_editable': True, 'asked_on_registration': False, 'name': 'birthdate' } ] }, 'variables': { 'hobo_test_variable': True, 'other_variable': 'foo', }, 'services': [ { 'service-id': 'authentic', 'slug': 'test', 'title': 'Test', 'this': True, 'secret_key': '12345', 'base_url': 'http://sso.example.net', 'variables': { 'other_variable': 'bar', } }, { 'service-id': 'wcs', 'template_name': 'commune', 'slug': 'montpellier-metropole', 'title': u'Montpellier-Métropole', 'base_url': 'http://eservices.example.net', 'saml-sp-metadata-url': 'http://eservices.example.net/saml/metadata', }, { 'service-id': 'passerelle', 'slug': 'passerelle', 'title': u'Passerelle', 'base_url': 'http://passerelle.example.net', 'saml-sp-metadata-url': 'http://passerelle.example.net/saml/metadata', }, { 'service-id': 'wcs', 'template_name': 'commune', 'slug': 'clapiers', 'title': 'Clapiers', 'base_url': 'http://clapiers.example.net', 'saml-sp-metadata-url': 'http://clapiers.example.net/saml/metadata', }, ] } hobo_json_content = json.dumps(env) hobo_json = tempfile.NamedTemporaryFile() hobo_json.write(hobo_json_content) hobo_json.flush() call_command('hobo_deploy', 'http://sso.example.net', hobo_json.name) from hobo.multitenant.middleware import TenantMiddleware tenants = list(TenantMiddleware.get_tenants()) assert len(tenants) == 1 tenant = tenants[0] assert tenant.domain_url == 'sso.example.net' assert tenant.schema_name == 'sso_example_net' tenant_directory = tenant.get_directory() assert tenant_directory == os.path.join(tenant_base, tenant.domain_url) assert os.path.exists(os.path.join(tenant_directory, 'saml.crt')) assert os.path.exists(os.path.join(tenant_directory, 'saml.key')) from tenant_schemas.utils import tenant_context with tenant_context(tenant): from authentic2.custom_user.models import User assert User.objects.count() == 1 user = User.objects.get() assert user.username == 'john.doe' assert user.first_name == 'John' assert user.last_name == 'Doe' assert user.email == 'john.doe@example.net' assert user.is_superuser is True assert user.is_staff is True from authentic2.models import Attribute assert Attribute.all_objects.count() == 10 for field in env['profile']['fields']: if field['name'] != 'email': at = Attribute.all_objects.get(name=field['name']) assert at.label == field['label'] assert at.description == field['description'] assert at.kind == field['kind'] if field['disabled']: assert at.disabled is True else: assert at.disabled is False assert at.asked_on_registration == \ field['asked_on_registration'] assert at.user_visible == field['user_visible'] assert at.user_editable == field['user_editable'] for at in Attribute.all_objects.all(): assert [field for field in env['profile']['fields'] if field['name'] == at.name] # OIDC checks from authentic2_idp_oidc.utils import get_jwkset assert get_jwkset() # SAML checks from authentic2.saml.models import (SPOptionsIdPPolicy, LibertyProvider, LibertyServiceProvider) assert SPOptionsIdPPolicy.objects.count() == 1 policy = SPOptionsIdPPolicy.objects.get() assert policy.name == 'Default' assert policy.enabled is True assert policy.authn_request_signed is False assert policy.accepted_name_id_format == ['uuid'] assert policy.default_name_id_format == 'uuid' assert policy.attributes.count() == 14 assert policy.attributes.filter(enabled=True).count() == 12 assert policy.attributes.filter(enabled=False).count() == 2 assert policy.attributes.filter(name_format='basic').count() == 14 assert policy.attributes.filter(name='is_superuser', attribute_name='django_user_is_superuser') \ .count() == 1 assert policy.attributes.filter( name='username', attribute_name='django_user_username').count() == 1 for field in env['profile']['fields']: assert policy.attributes.filter( name=field['name'], attribute_name='django_user_%s' % field['name']).count() == 1 assert LibertyProvider.objects.count() == 3 services = tenant.get_hobo_json()['services'] from authentic2.a2_rbac.utils import get_default_ou from authentic2.a2_rbac.models import Role other_services = [service for service in services if not service.get('this')] for meta, service in zip(metadatas, other_services): provider = LibertyProvider.objects.get(slug=service['slug']) assert provider.metadata == meta # Two services loaded roles.json, so there must be 4 roles associated # to their "ou" excluding their superuser role, 2 admin roles for users # and roles, a 2 loaded roles, petite enfance and état-civil assert Role.objects.filter(ou__isnull=False, service__isnull=True).count() == 8 for service_id in set(s['service-id'] for s in other_services): same_services = [s for s in other_services if s['service-id'] == service_id] for i, service in enumerate(same_services): assert LibertyProvider.objects.filter(slug=service['slug']) \ .count() == 1 provider = LibertyProvider.objects.get(slug=service['slug']) assert provider.name == service['title'] assert provider.federation_source == 'hobo' assert provider.entity_id == service['saml-sp-metadata-url'] assert LibertyServiceProvider.objects.filter( liberty_provider=provider).count() == 1 service_provider = LibertyServiceProvider.objects.get( liberty_provider=provider) assert service_provider.enabled is True assert service_provider.sp_options_policy == policy assert service_provider.users_can_manage_federations is False assert Role.objects.filter(slug='_a2-hobo-superuser', service=provider).count() == 1 su_role = Role.objects.get(slug='_a2-hobo-superuser', service=provider) assert su_role.attributes.count() == 1 assert su_role.attributes.filter( name='is_superuser', kind='string', value='true').count() == 1 if i == 0 or service_id != 'wcs': assert provider.ou == get_default_ou() else: assert provider.ou.name == service['title'] assert su_role.ou == provider.ou assert provider.attributes.count() == 2 assert provider.attributes.filter( name='is_superuser', name_format='basic', attribute_name='is_superuser').count() == 1 assert provider.attributes.filter( name='role-slug', name_format='basic', attribute_name='a2_service_ou_role_uuids').count() == 1 if service.get('template_name'): assert Role.objects.filter( ou=provider.ou, service__isnull=True).count() == 4 assert Role.objects.filter( ou=provider.ou, service__isnull=True, name=u'Service petite enfance').count() == 1 assert Role.objects.filter( ou=provider.ou, service__isnull=True, name=u'Service état-civil').count() == 1 def test_import_template(db, tenant_base): def with_uuid_removed(input): if isinstance(input, dict): for key in input.keys(): if key == 'uuid': input.pop('uuid') return {k: with_uuid_removed(v) for k, v in input.iteritems()} elif isinstance(input, list): return [with_uuid_removed(e) for e in input] else: return input def with_lists_sorted(input): if isinstance(input, dict): return {k: with_lists_sorted(v) for k, v in input.iteritems()} if isinstance(input, list): return with_lists_sorted(input.sort()) else: return input call_command('create_tenant', 'authentic.example.net') tenant = TenantMiddleware.get_tenant_by_hostname('authentic.example.net') connection.set_tenant(tenant) call_command('import_template', '--basepath=%s' % os.path.dirname(__file__), 'data_authentic_export_site') content = open('%s/data_authentic_export_site.json' % os.path.dirname(__file__)).read() assert byteify(with_lists_sorted(with_uuid_removed(export_site()))) == byteify(with_lists_sorted(with_uuid_removed(json.loads(content))))