create 'import_site' and 'export_site' commands (#16514)

This commit is contained in:
Emmanuel Cazenave 2018-04-09 14:10:21 +02:00
parent a638275c09
commit 17dd1b2338
9 changed files with 1188 additions and 1 deletions

View File

@ -92,6 +92,15 @@ class OrganizationalUnit(OrganizationalUnitAbstractBase):
def cached(cls):
return cls.objects.all()
def export_json(self):
return {
'uuid': self.uuid, 'slug': self.slug, 'name': self.name,
'description': self.description, 'default': self.default,
'email_is_unique': self.email_is_unique,
'username_is_unique': self.username_is_unique,
'validate_emails': self.validate_emails
}
OrganizationalUnit._meta.natural_key = [['uuid'], ['slug'], ['name']]
@ -213,6 +222,29 @@ class Role(RoleAbstractBase):
'ou__slug': self.ou.slug if self.ou else None,
}
def export_json(self, attributes=False, parents=False, permissions=False):
d = {
'uuid': self.uuid, 'slug': self.slug, 'name': self.name,
'description': self.description, 'external_id': self.external_id,
'ou': self.ou and self.ou.natural_key_json(),
'service': self.service and self.service.natural_key_json()
}
if attributes:
for attribute in self.attributes.all():
d.setdefault('attributes', []).append(attribute.to_json())
if parents:
RoleParenting = rbac_utils.get_role_parenting_model()
for parenting in RoleParenting.objects.filter(child_id=self.id, direct=True):
d.setdefault('parents', []).append(parenting.parent.natural_key_json())
if permissions:
for perm in self.permissions.all():
d.setdefault('permissions', []).append(perm.export_json())
return d
Role._meta.natural_key = [
['uuid'], ['slug', 'ou'], ['name', 'ou'], ['slug', 'service'], ['name', 'service']
@ -250,6 +282,10 @@ class RoleAttribute(models.Model):
('role', 'name', 'kind', 'value'),
)
def to_json(self):
return {'name': self.name, 'kind': self.kind, 'value': self.value}
GenericRelation(Permission,
content_type_field='target_ct',
object_id_field='target_id').contribute_to_class(ContentType, 'admin_perms')

View File

@ -0,0 +1,281 @@
from django.contrib.contenttypes.models import ContentType
from django_rbac.models import Operation
from django_rbac.utils import (
get_ou_model, get_role_model, get_role_parenting_model, get_permission_model)
from authentic2.a2_rbac.models import RoleAttribute
from authentic2.utils import update_model
def export_site():
return {
'roles': export_roles(get_role_model().objects.all()),
'ous': export_ou(get_ou_model().objects.all())
}
def export_ou(ou_query_set):
return [ou.export_json() for ou in ou_query_set]
def export_roles(role_queryset):
""" Serialize roles in role_queryset
"""
return [
role.export_json(attributes=True, parents=True, permissions=True)
for role in role_queryset
]
def search_ou(ou_d):
try:
OU = get_ou_model()
return OU.objects.get_by_natural_key_json(ou_d)
except OU.DoesNotExist:
return None
def search_role(role_d):
Role = get_role_model()
try:
Role = get_role_model()
return Role.objects.get_by_natural_key_json(role_d)
except Role.DoesNotExist:
return None
class ImportContext(object):
""" Holds information on how to perform the import.
ou_delete_orphans: if True any existing ou that is not found in the export will
be deleted
role_delete_orphans: if True any existing role that is not found in the export will
be deleted
role_attributes_update: for each role in the import data,
attributes will deleted and re-created
role_parentings_update: for each role in the import data,
parentings will deleted and re-created
role_permissions_update: for each role in the import data,
permissions will deleted and re-created
"""
def __init__(
self, role_delete_orphans=False, role_parentings_update=True,
role_permissions_update=True, role_attributes_update=True,
ou_delete_orphans=False):
self.role_delete_orphans = role_delete_orphans
self.ou_delete_orphans = ou_delete_orphans
self.role_parentings_update = role_parentings_update
self.role_permissions_update = role_permissions_update
self.role_attributes_update = role_attributes_update
class DataImportError(Exception):
pass
class RoleDeserializer(object):
def __init__(self, d, import_context):
self._import_context = import_context
self._obj = None
self._parents = None
self._attributes = None
self._permissions = None
self._role_d = dict()
for key, value in d.items():
if key == 'parents':
self._parents = value
elif key == 'attributes':
self._attributes = value
elif key == 'permissions':
self._permissions = value
else:
self._role_d[key] = value
def deserialize(self):
ou_d = self._role_d['ou']
has_ou = bool(ou_d)
ou = None if not has_ou else search_ou(ou_d)
if has_ou and not ou:
raise DataImportError(
"Can't import role because missing Organizational Unit : "
"%s" % ou_d)
kwargs = self._role_d.copy()
del kwargs['ou']
del kwargs['service']
if has_ou:
kwargs['ou'] = ou
obj = search_role(self._role_d)
if obj: # Role already exist
self._obj = obj
status = 'updated'
update_model(self._obj, kwargs)
else: # Create role
self._obj = get_role_model().objects.create(**kwargs)
status = 'created'
# Ensure admin role is created.
# Absoluteley necessary to create
# parentings relationship later on,
# since we don't deserialize technical role.
self._obj.get_admin_role()
return self._obj, status
def attributes(self):
""" Update attributes (delete everything then create)
"""
created, deleted = [], []
for attr in self._obj.attributes.all():
attr.delete()
deleted.append(attr)
# Create attributes
if self._attributes:
for attr_dict in self._attributes:
attr_dict['role'] = self._obj
created.append(RoleAttribute.objects.create(**attr_dict))
return created, deleted
def parentings(self):
""" Update parentings (delete everything then create)
"""
created, deleted = [], []
Parenting = get_role_parenting_model()
for parenting in Parenting.objects.filter(child=self._obj, direct=True):
parenting.delete()
deleted.append(parenting)
if self._parents:
for parent_d in self._parents:
parent = search_role(parent_d)
if not parent:
raise DataImportError("Could not find role : %s" % parent_d)
created.append(Parenting.objects.create(
child=self._obj, direct=True, parent=parent))
return created, deleted
def permissions(self):
""" Update permissions (delete everything then create)
"""
created, deleted = [], []
for perm in self._obj.permissions.all():
perm.delete()
deleted.append(perm)
self._obj.permissions.clear()
if self._permissions:
for perm in self._permissions:
op = Operation.objects.get_by_natural_key_json(perm['operation'])
ou = get_ou_model().objects.get_by_natural_key_json(
perm['ou']) if perm['ou'] else None
ct = ContentType.objects.get_by_natural_key_json(perm['target_ct'])
target = ct.model_class().objects.get_by_natural_key_json(perm['target'])
perm = get_permission_model().objects.create(
operation=op, ou=ou, target_ct=ct, target_id=target.pk)
self._obj.permissions.add(perm)
created.append(perm)
return created, deleted
class ImportResult(object):
def __init__(self):
self.roles = {'created': [], 'updated': []}
self.ous = {'created': [], 'updated': []}
self.attributes = {'created': [], 'deleted': []}
self.parentings = {'created': [], 'deleted': []}
self.permissions = {'created': [], 'deleted': []}
def update_roles(self, role, d_status):
self.roles[d_status].append(role)
def update_ous(self, ou, status):
self.ous[status].append(ou)
def _bulk_update(self, attrname, created, deleted):
attr = getattr(self, attrname)
attr['created'].extend(created)
attr['deleted'].extend(deleted)
def update_attributes(self, created, deleted):
self._bulk_update('attributes', created, deleted)
def update_parentings(self, created, deleted):
self._bulk_update('parentings', created, deleted)
def update_permissions(self, created, deleted):
self._bulk_update('permissions', created, deleted)
def to_str(self, verbose=False):
res = ""
for attr in ('roles', 'ous', 'parentings', 'permissions', 'attributes'):
data = getattr(self, attr)
for status in ('created', 'updated', 'deleted'):
if status in data:
s_data = data[status]
res += "%s %s %s\n" % (len(s_data), attr, status)
return res
def import_ou(ou_d):
OU = get_ou_model()
# ou = search_ou([ou_d['slug']])
ou = search_ou(ou_d)
if ou is None:
ou = OU.objects.create(**ou_d)
status = 'created'
else:
update_model(ou, ou_d)
status = 'updated'
# Ensure admin role is created
ou.get_admin_role()
return ou, status
def import_site(json_d, import_context):
result = ImportResult()
for ou_d in json_d.get('ous', []):
result.update_ous(*import_ou(ou_d))
roles_ds = [RoleDeserializer(role_d, import_context) for role_d in json_d.get('roles', [])
if not role_d['slug'].startswith('_')]
for ds in roles_ds:
result.update_roles(*ds.deserialize())
if import_context.role_attributes_update:
for ds in roles_ds:
result.update_attributes(*ds.attributes())
if import_context.role_parentings_update:
for ds in roles_ds:
result.update_parentings(*ds.parentings())
if import_context.role_permissions_update:
for ds in roles_ds:
result.update_permissions(*ds.permissions())
if import_context.ou_delete_orphans:
raise DataImportError(
"Unsupported context value for ou_delete_orphans : %s" % (
import_context.ou_delete_orphans))
if import_context.role_delete_orphans:
# FIXME : delete each role that is in DB but not in the export
raise DataImportError(
"Unsupported context value for role_delete_orphans : %s" % (
import_context.role_delete_orphans))
return result

View File

@ -0,0 +1,24 @@
import json
import sys
from django.core.management.base import BaseCommand
from authentic2.data_transfer import export_site
from django_rbac.utils import get_role_model
class Command(BaseCommand):
help = 'Export site'
def add_arguments(self, parser):
parser.add_argument('--output', metavar='FILE', default=None,
help='name of a file to write output to')
def handle(self, *args, **options):
if options['output']:
output, close = open(options['output'], 'w'), True
else:
output, close = sys.stdout, False
json.dump(export_site(), output, indent=4)
if close:
output.close()

View File

@ -0,0 +1,71 @@
import contextlib
import json
import sys
from django.conf import settings
from django.core.management.base import BaseCommand
from django.db import transaction
from django.utils import translation
from authentic2.data_transfer import import_site, ImportContext
class DryRunException(Exception):
pass
def create_context_args(options):
kwargs = {}
if options['option']:
for context_op in options['option']:
context_op = context_op.replace('-', '_')
if context_op.startswith('no_'):
kwargs[context_op[3:]] = False
else:
kwargs[context_op] = True
return kwargs
# Borrowed from https://bugs.python.org/issue10049#msg118599
@contextlib.contextmanager
def provision_contextm(dry_run, settings):
if dry_run and 'hobo.agent.authentic2' in settings.INSTALLED_APPS:
import hobo.agent.authentic2
with hobo.agent.authentic2.provisionning.Provisionning():
yield
else:
yield
class Command(BaseCommand):
help = 'Import site'
def add_arguments(self, parser):
parser.add_argument(
'filename', metavar='FILENAME', type=str, help='name of file to import')
parser.add_argument(
'--dry-run', action='store_true', dest='dry_run', help='Really perform the import')
parser.add_argument(
'-o', '--option', action='append', help='Import context options',
choices=[
'role-delete-orphans', 'ou-delete-orphans', 'no-role-permissions-update',
'no-role-attributes-update', 'no-role-parentings-update'])
def handle(self, filename, **options):
translation.activate(settings.LANGUAGE_CODE)
dry_run = options['dry_run']
msg = "Dry run\n" if dry_run else "Real run\n"
c_kwargs = create_context_args(options)
try:
with open(filename, 'r') as f:
with provision_contextm(dry_run, settings):
with transaction.atomic():
sys.stdout.write(msg)
result = import_site(json.load(f), ImportContext(**c_kwargs))
if dry_run:
raise DryRunException()
except DryRunException:
pass
sys.stdout.write(result.to_str())
sys.stdout.write("Success\n")
translation.deactivate()

View File

@ -1047,3 +1047,8 @@ def send_email_change_email(user, email, request=None, context=None, template_na
context=ctx,
legacy_subject_templates=legacy_subject_templates,
legacy_body_templates=legacy_body_templates)
def update_model(obj, d):
for attr, value in d.items():
setattr(obj, attr, value)

View File

@ -114,6 +114,9 @@ class Operation(models.Model):
def __unicode__(self):
return unicode(_(self.name))
def export_json(self):
return {'slug': self.slug, 'name': self.name}
objects = managers.OperationManager()
@ -145,6 +148,14 @@ class PermissionAbstractBase(models.Model):
self.target and self.target_ct.natural_key(),
self.target and self.target.natural_key()]
def export_json(self):
return {
"operation": self.operation.natural_key_json(),
"ou": self.ou and self.ou.natural_key_json(),
'target_ct': self.target_ct.natural_key_json(),
"target": self.target.natural_key_json()
}
def __unicode__(self):
ct = ContentType.objects.get_for_id(self.target_ct_id)
ct_ct = ContentType.objects.get_for_model(ContentType)

View File

@ -1,7 +1,11 @@
import pytest
from django.contrib.contenttypes.models import ContentType
from django_rbac.utils import get_permission_model
from django_rbac.models import Operation
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU, RoleAttribute
from authentic2.models import Service
from authentic2.a2_rbac.models import Role, OrganizationalUnit as OU
from authentic2.utils import get_hex_uuid
def test_role_natural_key(db):
@ -24,3 +28,155 @@ def test_role_natural_key(db):
Role.objects.get_by_natural_key(*r2.natural_key())
with pytest.raises(Role.DoesNotExist):
Role.objects.get_by_natural_key(*r4.natural_key())
def test_basic_role_export_json(db):
role = Role.objects.create(
name='basic role', slug='basic-role', description='basic role description')
role_dict = role.export_json()
assert role_dict['name'] == role.name
assert role_dict['slug'] == role.slug
assert role_dict['uuid'] == role.uuid
assert role_dict['description'] == role.description
assert role_dict['external_id'] == role.external_id
assert role_dict['ou'] is None
assert role_dict['service'] is None
def test_role_with_ou_export_json(db):
ou = OU.objects.create(name='ou', slug='ou')
role = Role.objects.create(name='some role', ou=ou)
role_dict = role.export_json()
assert role_dict['ou'] == {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name}
def test_role_with_service_export_json(db):
service = Service.objects.create(name='service name', slug='service-name')
role = Role.objects.create(name='some role', service=service)
role_dict = role.export_json()
assert role_dict['service'] == {'slug': service.slug, 'ou': None}
def test_role_with_service_with_ou_export_json(db):
ou = OU.objects.create(name='ou', slug='ou')
service = Service.objects.create(name='service name', slug='service-name', ou=ou)
role = Role.objects.create(name='some role', service=service)
role_dict = role.export_json()
assert role_dict['service'] == {
'slug': service.slug, 'ou': {'uuid': ou.uuid, 'slug': 'ou', 'name': 'ou'}}
def test_role_with_attributes_export_json(db):
role = Role.objects.create(name='some role')
attr1 = RoleAttribute.objects.create(
role=role, name='attr1_name', kind='string', value='attr1_value')
attr2 = RoleAttribute.objects.create(
role=role, name='attr2_name', kind='string', value='attr2_value')
role_dict = role.export_json(attributes=True)
attributes = role_dict['attributes']
assert len(attributes) == 2
expected_attr_names = set([attr1.name, attr2.name])
for attr_dict in attributes:
assert attr_dict['name'] in expected_attr_names
expected_attr_names.remove(attr_dict['name'])
target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first()
assert attr_dict['kind'] == target_attr.kind
assert attr_dict['value'] == target_attr.value
def test_role_with_parents_export_json(db):
grand_parent_role = Role.objects.create(
name='test grand parent role', slug='test-grand-parent-role')
parent_1_role = Role.objects.create(
name='test parent 1 role', slug='test-parent-1-role')
parent_1_role.add_parent(grand_parent_role)
parent_2_role = Role.objects.create(
name='test parent 2 role', slug='test-parent-2-role')
parent_2_role.add_parent(grand_parent_role)
child_role = Role.objects.create(
name='test child role', slug='test-child-role')
child_role.add_parent(parent_1_role)
child_role.add_parent(parent_2_role)
child_role_dict = child_role.export_json(parents=True)
assert child_role_dict['slug'] == child_role.slug
parents = child_role_dict['parents']
assert len(parents) == 2
expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
for parent in parents:
assert parent['slug'] in expected_slugs
expected_slugs.remove(parent['slug'])
grand_parent_role_dict = grand_parent_role.export_json(parents=True)
assert grand_parent_role_dict['slug'] == grand_parent_role.slug
assert 'parents' not in grand_parent_role_dict
parent_1_role_dict = parent_1_role.export_json(parents=True)
assert parent_1_role_dict['slug'] == parent_1_role.slug
parents = parent_1_role_dict['parents']
assert len(parents) == 1
assert parents[0]['slug'] == grand_parent_role.slug
parent_2_role_dict = parent_2_role.export_json(parents=True)
assert parent_2_role_dict['slug'] == parent_2_role.slug
parents = parent_2_role_dict['parents']
assert len(parents) == 1
assert parents[0]['slug'] == grand_parent_role.slug
def test_role_with_permission_export_json(db):
some_ou = OU.objects.create(name='some ou', slug='some-ou')
role = Role.objects.create(name='role name', slug='role-slug')
other_role = Role.objects.create(
name='other role name', slug='other-role-slug', uuid=get_hex_uuid(), ou=some_ou)
ou = OU.objects.create(name='basic ou', slug='basic-ou', description='basic ou description')
Permission = get_permission_model()
op = Operation.objects.first()
perm_saml = Permission.objects.create(
operation=op, ou=ou,
target_ct=ContentType.objects.get_for_model(ContentType),
target_id=ContentType.objects.get(app_label="saml", model="libertyprovider").pk)
role.permissions.add(perm_saml)
perm_role = Permission.objects.create(
operation=op, ou=None,
target_ct=ContentType.objects.get_for_model(Role),
target_id=other_role.pk)
role.permissions.add(perm_role)
export = role.export_json(permissions=True)
permissions = export['permissions']
assert len(permissions) == 2
assert permissions[0] == {
'operation': {'slug': 'add'},
'ou': {'uuid': ou.uuid, 'slug': ou.slug, 'name': ou.name},
'target_ct': {'app_label': u'contenttypes', 'model': u'contenttype'},
'target': {'model': u'libertyprovider', 'app_label': u'saml'}
}
assert permissions[1] == {
'operation': {'slug': 'add'},
'ou': None,
'target_ct': {'app_label': u'a2_rbac', 'model': u'role'},
'target': {
'slug': u'other-role-slug', 'service': None, 'uuid': other_role.uuid,
'ou': {
'slug': u'some-ou', 'uuid': some_ou.uuid, 'name': u'some ou'
},
'name': u'other role name'}
}
def test_ou_export_json(db):
ou = OU.objects.create(
name='basic ou', slug='basic-ou', description='basic ou description',
username_is_unique=True, email_is_unique=True, default=False, validate_emails=True)
ou_dict = ou.export_json()
assert ou_dict['name'] == ou.name
assert ou_dict['slug'] == ou.slug
assert ou_dict['uuid'] == ou.uuid
assert ou_dict['description'] == ou.description
assert ou_dict['username_is_unique'] == ou.username_is_unique
assert ou_dict['email_is_unique'] == ou.email_is_unique
assert ou_dict['default'] == ou.default
assert ou_dict['validate_emails'] == ou.validate_emails

471
tests/test_data_transfer.py Normal file
View File

@ -0,0 +1,471 @@
import json
from django_rbac.utils import get_role_model, get_ou_model
import py
import pytest
from authentic2.a2_rbac.models import RoleParenting
from authentic2.data_transfer import (
DataImportError, export_roles, import_site, export_ou, ImportContext,
RoleDeserializer, search_role, import_ou)
from authentic2.utils import get_hex_uuid
Role = get_role_model()
OU = get_ou_model()
def test_export_basic_role(db):
role = Role.objects.create(name='basic role', slug='basic-role', uuid=get_hex_uuid())
query_set = Role.objects.filter(uuid=role.uuid)
roles = export_roles(query_set)
assert len(roles) == 1
role_dict = roles[0]
for key, value in role.export_json().items():
assert role_dict[key] == value
def test_export_role_with_parents(db):
grand_parent_role = Role.objects.create(
name='test grand parent role', slug='test-grand-parent-role', uuid=get_hex_uuid())
parent_1_role = Role.objects.create(
name='test parent 1 role', slug='test-parent-1-role', uuid=get_hex_uuid())
parent_1_role.add_parent(grand_parent_role)
parent_2_role = Role.objects.create(
name='test parent 2 role', slug='test-parent-2-role', uuid=get_hex_uuid())
parent_2_role.add_parent(grand_parent_role)
child_role = Role.objects.create(
name='test child role', slug='test-child-role', uuid=get_hex_uuid())
child_role.add_parent(parent_1_role)
child_role.add_parent(parent_2_role)
query_set = Role.objects.filter(slug__startswith='test').order_by('slug')
roles = export_roles(query_set)
assert len(roles) == 4
child_role_dict = roles[0]
assert child_role_dict['slug'] == child_role.slug
parents = child_role_dict['parents']
assert len(parents) == 2
expected_slugs = set([parent_1_role.slug, parent_2_role.slug])
for parent in parents:
assert parent['slug'] in expected_slugs
expected_slugs.remove(parent['slug'])
grand_parent_role_dict = roles[1]
assert grand_parent_role_dict['slug'] == grand_parent_role.slug
parent_1_role_dict = roles[2]
assert parent_1_role_dict['slug'] == parent_1_role.slug
parents = parent_1_role_dict['parents']
assert len(parents) == 1
assert parents[0]['slug'] == grand_parent_role.slug
parent_2_role_dict = roles[3]
assert parent_2_role_dict['slug'] == parent_2_role.slug
parents = parent_2_role_dict['parents']
assert len(parents) == 1
assert parents[0]['slug'] == grand_parent_role.slug
def test_export_ou(db):
ou = OU.objects.create(name='ou name', slug='ou-slug', description='ou description')
ous = export_ou(OU.objects.filter(name='ou name'))
assert len(ous) == 1
ou_d = ous[0]
assert ou_d['name'] == ou.name
assert ou_d['slug'] == ou.slug
assert ou_d['description'] == ou.description
def test_search_role_by_uuid(db):
uuid = get_hex_uuid()
role_d = {'uuid': uuid, 'slug': 'role-slug'}
role = Role.objects.create(**role_d)
assert role == search_role({'uuid': uuid, 'slug': 'other-role-slug'})
def test_search_role_by_slug(db):
role_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug'}
role = Role.objects.create(**role_d)
assert role == search_role({
'uuid': get_hex_uuid(), 'slug': 'role-slug',
'ou': None, 'service': None})
def test_search_role_not_found(db):
assert search_role(
{
'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name',
'ou': None, 'service': None}) is None
def test_search_role_slug_not_unique(db):
role1_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'}
role2_d = {'uuid': get_hex_uuid(), 'slug': 'role-slug', 'name': 'role name'}
ou = OU.objects.create(name='some ou', slug='some-ou')
role1 = Role.objects.create(ou=ou, **role1_d)
Role.objects.create(**role2_d)
assert role1 == search_role(role1.export_json())
def test_role_deserializer(db):
rd = RoleDeserializer({
'name': 'some role', 'description': 'some role description', 'slug': 'some-role',
'uuid': get_hex_uuid(), 'ou': None, 'service': None}, ImportContext())
assert rd._parents is None
assert rd._attributes is None
assert rd._obj is None
role, status = rd.deserialize()
assert status == 'created'
assert role.name == 'some role'
assert role.description == 'some role description'
assert role.slug == 'some-role'
assert rd._obj == role
def test_role_deserializer_with_ou(db):
ou = OU.objects.create(name='some ou', slug='some-ou')
rd = RoleDeserializer({
'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description',
'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None}, ImportContext())
role, status = rd.deserialize()
assert role.ou == ou
def test_role_deserializer_missing_ou(db):
rd = RoleDeserializer({
'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'role description',
'slug': 'some-role', 'ou': {'slug': 'some-ou'}, 'service': None},
ImportContext())
with pytest.raises(DataImportError):
rd.deserialize()
def test_role_deserializer_update_ou(db):
ou1 = OU.objects.create(name='ou 1', slug='ou-1')
ou2 = OU.objects.create(name='ou 2', slug='ou-2')
uuid = get_hex_uuid()
existing_role = Role.objects.create(uuid=uuid, slug='some-role', ou=ou1)
rd = RoleDeserializer({
'uuid': uuid, 'name': 'some-role', 'slug': 'some-role',
'ou': {'slug': 'ou-2'}, 'service': None}, ImportContext())
role, status = rd.deserialize()
assert role == existing_role
assert role.ou == ou2
def test_role_deserializer_update_fields(db):
uuid = get_hex_uuid()
existing_role = Role.objects.create(uuid=uuid, slug='some-role', name='some role')
rd = RoleDeserializer({
'uuid': uuid, 'slug': 'some-role', 'name': 'some role changed',
'ou': None, 'service': None}, ImportContext())
role, status = rd.deserialize()
assert role == existing_role
assert role.name == 'some role changed'
def test_role_deserializer_with_attributes(db):
attributes_data = {
'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'),
'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value')
}
rd = RoleDeserializer({
'uuid': get_hex_uuid(), 'name': 'some role', 'description': 'some role description',
'slug': 'some-role', 'attributes': list(attributes_data.values()),
'ou': None, 'service': None}, ImportContext())
role, status = rd.deserialize()
created, deleted = rd.attributes()
assert role.attributes.count() == 2
assert len(created) == 2
for attr in created:
attr_dict = attributes_data[attr.name]
assert attr_dict['name'] == attr.name
assert attr_dict['kind'] == attr.kind
assert attr_dict['value'] == attr.value
del attributes_data[attr.name]
def test_role_deserializer_creates_admin_role(db):
role_dict = {
'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(),
'ou': None, 'service': None}
rd = RoleDeserializer(role_dict, ImportContext())
rd.deserialize()
Role.objects.get(slug='_a2-managers-of-role-some-role')
def test_role_deserializer_parenting_existing_parent(db):
parent_role_dict = {
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
'ou': None, 'service': None}
parent_role = Role.objects.create(**parent_role_dict)
child_role_dict = {
'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
'uuid': get_hex_uuid(), 'ou': None, 'service': None}
rd = RoleDeserializer(child_role_dict, ImportContext())
child_role, status = rd.deserialize()
created, deleted = rd.parentings()
assert len(created) == 1
parenting = created[0]
assert parenting.direct is True
assert parenting.parent == parent_role
assert parenting.child == child_role
def test_role_deserializer_parenting_non_existing_parent(db):
parent_role_dict = {
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
'ou': None, 'service': None}
child_role_dict = {
'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
'uuid': get_hex_uuid(), 'ou': None, 'service': None}
rd = RoleDeserializer(child_role_dict, ImportContext())
rd.deserialize()
with pytest.raises(DataImportError) as excinfo:
rd.parentings()
assert "Could not find role" in str(excinfo.value)
def test_role_deserializer_permissions(db):
ou = OU.objects.create(slug='some-ou')
other_role_dict = {
'name': 'other role', 'slug': 'other-role-slug', 'uuid': get_hex_uuid(), 'ou': ou}
other_role = Role.objects.create(**other_role_dict)
other_role_dict['permisison'] = {
"operation": {
"slug": "admin"
},
"ou": {
"slug": "default",
"name": "Collectivit\u00e9 par d\u00e9faut"
},
'target_ct': {'app_label': u'a2_rbac', 'model': u'role'},
"target": {
"slug": "role-deux",
"ou": {
"slug": "default",
"name": "Collectivit\u00e9 par d\u00e9faut"
},
"service": None,
"name": "role deux"
}
}
some_role_dict = {
'name': 'some role', 'slug': 'some-role', 'uuid': get_hex_uuid(),
'ou': None, 'service': None}
some_role_dict['permissions'] = [
{
'operation': {'slug': 'add'},
'ou': None,
'target_ct': {'app_label': u'a2_rbac', 'model': u'role'},
'target': {
"slug": u'other-role-slug', 'ou': {'slug': 'some-ou'}, 'service': None}
}
]
import_context = ImportContext()
rd = RoleDeserializer(some_role_dict, import_context)
rd.deserialize()
perm_created, perm_deleted = rd.permissions()
assert len(perm_created) == 1
assert len(perm_deleted) == 0
del some_role_dict['permissions']
role = Role.objects.get(slug=some_role_dict['slug'])
assert role.permissions.count() == 1
perm = role.permissions.first()
assert perm.operation.slug == 'add'
assert not perm.ou
assert perm.target == other_role
# that one should delete permissions
rd = RoleDeserializer(some_role_dict, import_context)
role, _ = rd.deserialize()
perm_created, perm_deleted = rd.permissions()
assert role.permissions.count() == 0
assert len(perm_created) == 0
assert len(perm_deleted) == 1
def test_permission_on_role(db):
perm_ou = OU.objects.create(slug='perm-ou', name='perm ou')
perm_role = Role.objects.create(slug='perm-role', ou=perm_ou, name='perm role')
some_role_dict = {
'name': 'some role', 'slug': 'some-role-slug', 'ou': None, 'service': None}
some_role_dict['permissions'] = [{
"operation": {
"slug": "admin"
},
"ou": {
"slug": "perm-ou",
"name": "perm-ou"
},
'target_ct': {'app_label': u'a2_rbac', 'model': u'role'},
"target": {
"slug": "perm-role",
"ou": {
"slug": "perm-ou",
"name": "perm ou"
},
"service": None,
"name": "perm role"
}
}]
import_context = ImportContext()
rd = RoleDeserializer(some_role_dict, import_context)
rd.deserialize()
perm_created, perm_deleted = rd.permissions()
assert len(perm_created) == 1
perm = perm_created[0]
assert perm.target == perm_role
assert perm.ou == perm_ou
assert perm.operation.slug == 'admin'
def test_permission_on_contentype(db):
perm_ou = OU.objects.create(slug='perm-ou', name='perm ou')
some_role_dict = {
'name': 'some role', 'slug': 'some-role-slug', 'ou': None, 'service': None}
some_role_dict['permissions'] = [{
"operation": {
"slug": "admin"
},
"ou": {
"slug": "perm-ou",
"name": "perm-ou"
},
'target_ct': {"model": "contenttype", "app_label": "contenttypes"},
"target": {"model": "logentry", "app_label": "admin"}
}]
import_context = ImportContext()
rd = RoleDeserializer(some_role_dict, import_context)
rd.deserialize()
perm_created, perm_deleted = rd.permissions()
assert len(perm_created) == 1
perm = perm_created[0]
assert perm.target.app_label == 'admin'
assert perm.target.model == 'logentry'
assert perm.ou == perm_ou
def import_ou_created(db):
uuid = get_hex_uuid()
ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'}
ou, status = import_ou(ou_d)
assert status == 'created'
assert ou.uuid == ou_d['uuid']
assert ou.slug == ou_d['slug']
assert ou.name == ou_d['name']
def import_ou_updated(db):
ou = OU.objects.create(slug='some-ou', name='ou name')
ou_d = {'uuid': ou.uuid, 'slug': ou.slug, 'name': 'new name'}
ou_updated, status = import_ou(ou_d)
assert status == 'updated'
assert ou == ou_updated
assert ou.name == 'new name'
def testi_import_site_empty():
res = import_site({}, ImportContext())
assert res.roles == {'created': [], 'updated': []}
assert res.ous == {'created': [], 'updated': []}
assert res.parentings == {'created': [], 'deleted': []}
def test_import_site_roles(db):
parent_role_dict = {
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
'ou': None, 'service': None}
child_role_dict = {
'name': 'child role', 'slug': 'child-role', 'parents': [parent_role_dict],
'uuid': get_hex_uuid(), 'ou': None, 'service': None}
roles = [
parent_role_dict,
child_role_dict
]
res = import_site({'roles': roles}, ImportContext())
created_roles = res.roles['created']
assert len(created_roles) == 2
parent_role = Role.objects.get(**parent_role_dict)
del child_role_dict['parents']
child_role = Role.objects.get(**child_role_dict)
assert created_roles[0] == parent_role
assert created_roles[1] == child_role
assert len(res.parentings['created']) == 1
assert res.parentings['created'][0] == RoleParenting.objects.get(
child=child_role, parent=parent_role, direct=True)
def test_roles_import_ignore_technical_role(db):
roles = [{
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
res = import_site({'roles': roles}, ImportContext())
assert res.roles == {'created': [], 'updated': []}
def test_roles_import_ignore_technical_role_with_service(db):
roles = [{
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
res = import_site({'roles': roles}, ImportContext())
assert res.roles == {'created': [], 'updated': []}
def test_import_role_handle_manager_role_parenting(db):
parent_role_dict = {
'name': 'grand parent role', 'slug': 'grand-parent-role', 'uuid': get_hex_uuid(),
'ou': None, 'service': None}
parent_role_manager_dict = {
'name': 'Administrateur du role grand parent role',
'slug': '_a2-managers-of-role-grand-parent-role', 'uuid': get_hex_uuid(),
'ou': None, 'service': None}
child_role_dict = {
'name': 'child role', 'slug': 'child-role',
'parents': [parent_role_dict, parent_role_manager_dict],
'uuid': get_hex_uuid(), 'ou': None, 'service': None}
import_site({'roles': [child_role_dict, parent_role_dict]}, ImportContext())
child = Role.objects.get(slug='child-role')
manager = Role.objects.get(slug='_a2-managers-of-role-grand-parent-role')
RoleParenting.objects.get(child=child, parent=manager, direct=True)
def test_import_roles_role_delete_orphans(db):
roles = [{
'name': 'some role', 'description': 'some role description', 'slug': '_some-role'}]
with pytest.raises(DataImportError):
import_site({'roles': roles}, ImportContext(role_delete_orphans=True))
def test_import_ou(db):
uuid = get_hex_uuid()
name = 'ou name'
ous = [{'uuid': uuid, 'slug': 'ou-slug', 'name': name}]
res = import_site({'ous': ous}, ImportContext())
assert len(res.ous['created']) == 1
ou = res.ous['created'][0]
assert ou.uuid == uuid
assert ou.name == name
Role.objects.get(slug='_a2-managers-of-ou-slug')
def test_import_ou_already_existing(db):
uuid = get_hex_uuid()
ou_d = {'uuid': uuid, 'slug': 'ou-slug', 'name': 'ou name'}
ou = OU.objects.create(**ou_d)
num_ous = OU.objects.count()
res = import_site({'ous': [ou_d]}, ImportContext())
assert len(res.ous['created']) == 0
assert num_ous == OU.objects.count()
assert ou == OU.objects.get(uuid=uuid)

View File

@ -0,0 +1,132 @@
import __builtin__
import json
from django.core import management
import pytest
from django_rbac.utils import get_role_model
def dummy_export_site(*args):
return {'roles': [{'name': 'role1'}]}
def test_export_role_cmd_stdout(db, capsys, monkeypatch):
import authentic2.management.commands.export_site
monkeypatch.setattr(
authentic2.management.commands.export_site, 'export_site', dummy_export_site)
management.call_command('export_site')
out, err = capsys.readouterr()
assert json.loads(out) == dummy_export_site()
def test_export_role_cmd_to_file(db, monkeypatch, tmpdir):
import authentic2.management.commands.export_site
monkeypatch.setattr(
authentic2.management.commands.export_site, 'export_site', dummy_export_site)
outfile = tmpdir.join('export.json')
management.call_command('export_site', '--output', outfile.strpath)
with outfile.open('r') as f:
assert json.loads(f.read()) == dummy_export_site()
def test_import_site_cmd(db, tmpdir, monkeypatch):
export_file = tmpdir.join('roles-export.json')
with export_file.open('w'):
export_file.write(json.dumps({'roles': []}))
management.call_command('import_site', export_file.strpath)
def test_import_site_cmd_infos_on_stdout(db, tmpdir, monkeypatch, capsys):
export_file = tmpdir.join('roles-export.json')
with export_file.open('w'):
export_file.write(json.dumps(
{'roles': [{
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
'ou': None, 'service': None}]}))
management.call_command('import_site', export_file.strpath)
out, err = capsys.readouterr()
assert "Real run" in out
assert "1 roles created" in out
assert "0 roles updated" in out
def test_import_site_transaction_rollback_on_error(db, tmpdir, monkeypatch, capsys):
export_file = tmpdir.join('roles-export.json')
with export_file.open('w'):
export_file.write(json.dumps({'roles': []}))
Role = get_role_model()
def exception_import_site(*args):
Role.objects.create(slug='role-slug')
raise Exception()
import authentic2.management.commands.import_site
monkeypatch.setattr(
authentic2.management.commands.import_site, 'import_site', exception_import_site)
with pytest.raises(Exception):
management.call_command('import_site', export_file.strpath)
with pytest.raises(Role.DoesNotExist):
Role.objects.get(slug='role-slug')
def test_import_site_transaction_rollback_on_dry_run(db, tmpdir, monkeypatch, capsys):
export_file = tmpdir.join('roles-export.json')
with export_file.open('w'):
export_file.write(json.dumps(
{'roles': [{
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
'ou': None, 'service': None}]}))
Role = get_role_model()
management.call_command('import_site', '--dry-run', export_file.strpath)
with pytest.raises(Role.DoesNotExist):
Role.objects.get(slug='role-slug')
def test_import_site_cmd_unhandled_context_option(db, tmpdir, monkeypatch, capsys):
from authentic2.data_transfer import DataImportError
export_file = tmpdir.join('roles-export.json')
with export_file.open('w'):
export_file.write(json.dumps(
{'roles': [{
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
'ou': None, 'service': None}]}))
get_role_model().objects.create(uuid='dqfewrvesvews2532', slug='role-slug', name='role-name')
with pytest.raises(DataImportError):
management.call_command(
'import_site', '-o', 'role-delete-orphans', export_file.strpath)
def test_import_site_cmd_unknown_context_option(db, tmpdir, monkeypatch, capsys):
from django.core.management.base import CommandError
export_file = tmpdir.join('roles-export.json')
with pytest.raises(CommandError):
management.call_command('import_site', '-o', 'unknown-option', export_file.strpath)
def test_import_site_confirm_prompt_yes(db, tmpdir, monkeypatch):
export_file = tmpdir.join('roles-export.json')
with export_file.open('w'):
export_file.write(json.dumps(
{'roles': [{
'uuid': 'dqfewrvesvews2532', 'slug': 'role-slug', 'name': 'role-name',
'ou': None, 'service': None}]}))
def yes_raw_input(*args, **kwargs):
return 'yes'
monkeypatch.setattr(__builtin__, 'raw_input', yes_raw_input)
management.call_command('import_site', export_file.strpath, stdin='yes')
assert get_role_model().objects.get(uuid='dqfewrvesvews2532')