a2_rbac: move role attributes to real model fields (#69895)

This commit is contained in:
Valentin Deniaud 2022-10-25 14:27:52 +02:00
parent 9ba3df609b
commit 5f0c03e32f
13 changed files with 215 additions and 194 deletions

View File

@ -38,10 +38,6 @@ class RoleChildInline(admin.TabularInline):
return super().get_queryset(request).filter(direct=True)
class RoleAttributeInline(admin.TabularInline):
model = models.RoleAttribute
class RoleAdmin(admin.ModelAdmin):
inlines = [RoleChildInline, RoleParentInline]
fields = (
@ -62,7 +58,6 @@ class RoleAdmin(admin.ModelAdmin):
list_display = ('__str__', 'slug', 'ou', 'service', 'admin_scope')
list_select_related = True
list_filter = ['ou', 'service']
inlines = [RoleAttributeInline]
class OrganizationalUnitAdmin(admin.ModelAdmin):

View File

@ -0,0 +1,36 @@
# Generated by Django 2.2.26 on 2022-10-25 09:35
import django
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('a2_rbac', '0033_remove_old_operation_fk'),
]
operations = [
migrations.AddField(
model_name='role',
name='details',
field=models.TextField(blank=True, verbose_name='Role details (frontoffice)'),
),
migrations.AddField(
model_name='role',
name='emails',
field=django.contrib.postgres.fields.ArrayField(
base_field=models.EmailField(max_length=254), default=list, size=None
),
),
migrations.AddField(
model_name='role',
name='emails_to_members',
field=models.BooleanField(default=True, verbose_name='Emails to members'),
),
migrations.AddField(
model_name='role',
name='is_superuser',
field=models.BooleanField(default=False),
),
]

View File

@ -0,0 +1,61 @@
import json
from django.db import migrations
def populate_role_fields(apps, schema_editor):
Role = apps.get_model('a2_rbac', 'Role')
fields = {'details', 'emails', 'emails_to_members', 'is_superuser'}
roles = list(Role.objects.all().prefetch_related('attributes'))
for role in roles:
for attribute in role.attributes.all():
if attribute.name not in fields:
continue
try:
value = json.loads(attribute.value)
except json.JSONDecodeError:
continue
if attribute.name == 'emails':
if not isinstance(value, list):
continue
value = [x[:254] for x in value]
if attribute.name == 'details' and not isinstance(value, str):
continue
if attribute.name in ('emails_to_members', 'is_superuser') and not isinstance(value, bool):
continue
setattr(role, attribute.name, value)
Role.objects.bulk_update(roles, fields, batch_size=1000)
def reverse_populate_role_fields(apps, schema_editor):
Role = apps.get_model('a2_rbac', 'Role')
RoleAttribute = apps.get_model('a2_rbac', 'RoleAttribute')
fields = ['details', 'emails', 'emails_to_members']
attributes = []
for role in Role.objects.all():
for field in fields:
attributes.append(
RoleAttribute(
role_id=role.pk, name=field, kind='json', value=json.dumps(getattr(role, field))
)
)
RoleAttribute.objects.bulk_create(attributes, batch_size=1000)
class Migration(migrations.Migration):
dependencies = [
('a2_rbac', '0034_new_role_fields'),
]
operations = [
migrations.RunPython(populate_role_fields, reverse_code=reverse_populate_role_fields),
]

View File

@ -0,0 +1,16 @@
# Generated by Django 2.2.26 on 2022-10-25 10:33
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('a2_rbac', '0035_populate_role_fields'),
]
operations = [
migrations.DeleteModel(
name='RoleAttribute',
),
]

View File

@ -23,6 +23,7 @@ from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ValidationError
from django.core.validators import MinValueValidator
from django.db import models
@ -379,6 +380,10 @@ class Role(AbstractBase):
)
permissions = models.ManyToManyField(to=Permission, related_name='roles', blank=True)
name = models.TextField(verbose_name=_('name'))
details = models.TextField(_('Role details (frontoffice)'), blank=True)
emails = ArrayField(models.EmailField(), default=list)
emails_to_members = models.BooleanField(_('Emails to members'), default=True)
is_superuser = models.BooleanField(default=False)
admin_scope_ct = models.ForeignKey(
to='contenttypes.ContentType',
null=True,
@ -727,24 +732,6 @@ class RoleParenting(models.Model):
return '{} {}> {}'.format(self.parent.name, '-' if self.direct else '~', self.child.name)
class RoleAttribute(models.Model):
KINDS = (('string', _('string')),)
role = models.ForeignKey(
to=Role, verbose_name=_('role'), related_name='attributes', on_delete=models.CASCADE
)
name = models.CharField(max_length=64, verbose_name=_('name'))
kind = models.CharField(max_length=32, choices=KINDS, verbose_name=_('kind'))
value = models.TextField(verbose_name=_('value'))
class Meta:
verbose_name = 'role attribute'
verbose_name_plural = _('role attributes')
unique_together = (('role', 'name', 'kind', 'value'),)
def to_json(self):
return {'name': self.name, 'kind': self.kind, 'value': self.value}
class Operation(models.Model):
slug = models.CharField(max_length=32, verbose_name=_('slug'), unique=True)

View File

@ -101,7 +101,6 @@ default_settings = dict(
'authentic2.attributes_ng.sources.function',
'authentic2.attributes_ng.sources.django_user',
'authentic2.attributes_ng.sources.ldap',
'authentic2.attributes_ng.sources.service_roles',
),
definition='List of attribute backend classes or modules',
),

View File

@ -1,71 +0,0 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.utils.translation import gettext_lazy as _
from authentic2.a2_rbac.models import Role
from ...decorators import to_list
from ...models import Service
@to_list
def get_instances(ctx):
return [None]
@to_list
def get_attribute_names(instance, ctx):
service = ctx.get('service')
if not isinstance(service, Service):
return
names = []
for service_role in Role.objects.filter(service=service).prefetch_related('attributes'):
for service_role_attribute in service_role.attributes.all():
if service_role_attribute.name in names:
continue
names.append(service_role_attribute.name)
names.sort()
for name in names:
yield (name, '%s (%s)' % (name, _('role attribute')))
def get_dependencies(instance, ctx):
return (
'user',
'service',
)
def get_attributes(instance, ctx):
user = ctx.get('user')
service = ctx.get('service')
if not user or not service:
return ctx
ctx = ctx.copy()
roles = Role.objects.for_user(user).filter(service=service).prefetch_related('attributes')
for service_role in roles:
for service_role_attribute in service_role.attributes.all():
name = service_role_attribute.name
value = service_role_attribute.value
values = ctx.get(name, [])
if not isinstance(values, (list, tuple, set)):
values = [values]
values = set(values)
if value not in values:
values.add(value)
ctx[name] = values
return ctx

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import uuid
from functools import wraps
@ -24,14 +25,7 @@ from django.core.validators import validate_slug
from django.utils.text import format_lazy
from django.utils.translation import gettext_lazy as _
from authentic2.a2_rbac.models import (
Operation,
OrganizationalUnit,
Permission,
Role,
RoleAttribute,
RoleParenting,
)
from authentic2.a2_rbac.models import Operation, OrganizationalUnit, Permission, Role, RoleParenting
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.decorators import errorcollector
from authentic2.utils.lazy import lazy_join
@ -110,7 +104,7 @@ def export_ous(context):
def export_roles(context):
"""Serialize roles in role_queryset"""
return [role.export_json(attributes=True, parents=True, permissions=True) for role in context.role_qs]
return [role.export_json(parents=True, permissions=True) for role in context.role_qs]
def search_ou(ou_d):
@ -144,7 +138,7 @@ class ImportContext:
be deleted
role_attributes_update: for each role in the import data,
role_attributes_update: legacy, for each role in the import data,
attributes will deleted and re-created
@ -277,16 +271,12 @@ class RoleDeserializer:
@wraps_validationerror
def attributes(self):
"""Update attributes (delete everything then create)"""
"""Compatibility with old import files, set Role fields using attributes data"""
created, deleted = [], []
for attr in self._obj.attributes.all():
attr.delete()
deleted.append(attr)
# Create attributes
if self._attributes:
for attr_dict in self._attributes:
attr_dict['role'] = self._obj
created.append(RoleAttribute.objects.create(**attr_dict))
setattr(self._obj, attr_dict['name'], json.loads(attr_dict['value']))
return created, deleted

View File

@ -31,7 +31,7 @@ from django.utils.translation import gettext_lazy as _
from django.utils.translation import pgettext
from django_select2.forms import HeavySelect2Widget
from authentic2.a2_rbac.models import Operation, OrganizationalUnit, Permission, Role, RoleAttribute
from authentic2.a2_rbac.models import Operation, OrganizationalUnit, Permission, Role
from authentic2.a2_rbac.utils import generate_slug, get_default_ou
from authentic2.custom_user.backends import DjangoRBACBackend
from authentic2.forms.fields import (
@ -611,55 +611,24 @@ class ServiceSearchForm(OUSearchForm, NameSearchForm):
class RoleEditForm(SlugMixin, HideOUFieldMixin, LimitQuerysetFormMixin, CssClass, forms.ModelForm):
ou = forms.ModelChoiceField(
queryset=OrganizationalUnit.objects, required=True, label=_('Organizational unit')
)
details = forms.CharField(
label=_('Role details (frontoffice)'), widget=forms.Textarea, initial='', required=False
)
emails = CommaSeparatedCharField(
label=_('Emails'),
item_validators=[EmailValidator()],
required=False,
help_text=_('Emails must be separated by commas.'),
)
emails_to_members = forms.BooleanField(required=False, initial=True, label=_('Emails to members'))
class Meta:
model = Role
fields = ('name', 'slug', 'ou', 'description')
fields = ('name', 'slug', 'ou', 'description', 'details', 'emails', 'emails_to_members')
widgets = {
'name': forms.TextInput(),
}
def __init__(self, *args, **kwargs):
instance = kwargs.get('instance')
if instance:
fields = [x.name for x in Role._meta.get_fields()]
initial = kwargs.setdefault('initial', {})
role_attributes = RoleAttribute.objects.filter(role=instance, kind='json')
for role_attribute in role_attributes:
if role_attribute.name in fields:
continue
initial[role_attribute.name] = json.loads(role_attribute.value)
super().__init__(*args, **kwargs)
def save(self, commit=True):
fields = [x.name for x in Role._meta.get_fields()]
assert commit
instance = super().save(commit=commit)
for field in self.cleaned_data:
if field in fields:
continue
value = json.dumps(self.cleaned_data[field])
ra, created = RoleAttribute.objects.get_or_create(
role=instance, name=field, kind='json', defaults={'value': value}
)
if not created and ra.value != value:
ra.value = value
ra.save()
instance.save()
return instance
if 'ou' in self.fields:
self.fields['ou'].required = True
class OUEditForm(SlugMixin, CssClass, forms.ModelForm):

View File

@ -21,7 +21,7 @@ from django.core.management import call_command
from authentic2.a2_rbac.models import CHANGE_OP, MANAGE_MEMBERS_OP, Operation
from authentic2.a2_rbac.models import OrganizationalUnit as OU
from authentic2.a2_rbac.models import Permission, Role, RoleAttribute
from authentic2.a2_rbac.models import Permission, Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.custom_user.models import User
from authentic2.models import Service
@ -182,24 +182,6 @@ def test_role_with_service_with_ou_export_json(db):
assert role_dict['service'] == {'slug': service.slug, 'ou': {'uuid': ou.uuid, 'slug': 'ou', 'name': 'ou'}}
def test_role_with_attributes_export_json(db):
role = Role.objects.create(name='some role')
attr1 = RoleAttribute.objects.create(role=role, name='attr1_name', kind='string', value='attr1_value')
attr2 = RoleAttribute.objects.create(role=role, name='attr2_name', kind='string', value='attr2_value')
role_dict = role.export_json(attributes=True)
attributes = role_dict['attributes']
assert len(attributes) == 2
expected_attr_names = {attr1.name, attr2.name}
for attr_dict in attributes:
assert attr_dict['name'] in expected_attr_names
expected_attr_names.remove(attr_dict['name'])
target_attr = RoleAttribute.objects.filter(name=attr_dict['name']).first()
assert attr_dict['kind'] == target_attr.kind
assert attr_dict['value'] == target_attr.value
def test_role_with_parents_export_json(db):
grand_parent_role = Role.objects.create(name='test grand parent role', slug='test-grand-parent-role')
parent_1_role = Role.objects.create(name='test parent 1 role', slug='test-parent-1-role')
@ -750,3 +732,68 @@ def test_a2_rbac_operation_migration(migration, settings):
).count()
== 1
)
def test_a2_rbac_role_attribute_migration(migration, settings):
migrate_from = [('a2_rbac', '0034_new_role_fields')]
migrate_to = [('a2_rbac', '0036_delete_roleattribute')]
old_apps = migration.before(migrate_from)
Role = old_apps.get_model('a2_rbac', 'Role')
RoleAttribute = old_apps.get_model('a2_rbac', 'RoleAttribute')
role = Role.objects.create(name='role', slug='1')
RoleAttribute.objects.create(role=role, kind='json', name='details', value='"abc"')
RoleAttribute.objects.create(role=role, kind='json', name='emails', value='["a@a.com", "b@b.com"]')
RoleAttribute.objects.create(role=role, kind='json', name='emails_to_members', value='false')
RoleAttribute.objects.create(role=role, kind='string', name='is_superuser', value='true')
role = Role.objects.create(name='role_default_values', slug='2')
RoleAttribute.objects.create(role=role, kind='json', name='details', value='""')
RoleAttribute.objects.create(role=role, kind='json', name='emails', value='[]')
RoleAttribute.objects.create(role=role, kind='json', name='emails_to_members', value='true')
RoleAttribute.objects.create(role=role, kind='string', name='is_superuser', value='false')
role = Role.objects.create(name='role_no_attribute', slug='3')
role = Role.objects.create(name='role_bad_attributes', slug='4')
RoleAttribute.objects.create(role=role, kind='json', name='details', value='bad')
RoleAttribute.objects.create(role=role, kind='json', name='emails', value='true')
RoleAttribute.objects.create(role=role, kind='json', name='emails_to_members', value='bad')
RoleAttribute.objects.create(role=role, kind='string', name='unknown', value='xxx')
role = Role.objects.create(name='role_one_attribute', slug='5')
RoleAttribute.objects.create(role=role, kind='json', name='details', value='"xxx"')
new_apps = migration.apply(migrate_to)
Role = new_apps.get_model('a2_rbac', 'Role')
role = Role.objects.get(name='role')
assert role.details == 'abc'
assert role.emails == ['a@a.com', 'b@b.com']
assert role.emails_to_members is False
assert role.is_superuser is True
role = Role.objects.get(name='role_default_values')
assert role.details == ''
assert role.emails == []
assert role.emails_to_members is True
assert role.is_superuser is False
role = Role.objects.get(name='role_no_attribute')
assert role.details == ''
assert role.emails == []
assert role.emails_to_members is True
assert role.is_superuser is False
role = Role.objects.get(name='role_bad_attributes')
assert role.details == ''
assert role.emails == []
assert role.emails_to_members is True
assert role.is_superuser is False
role = Role.objects.get(name='role_one_attribute')
assert role.details == 'xxx'
assert role.emails == []
assert role.emails_to_members is True
assert role.is_superuser is False

View File

@ -238,8 +238,8 @@ def test_role_deserializer_update_fields(db):
def test_role_deserializer_with_attributes(db):
attributes_data = {
'attr1_name': dict(name='attr1_name', kind='string', value='attr1_value'),
'attr2_name': dict(name='attr2_name', kind='string', value='attr2_value'),
'is_superuser': dict(name='is_superuser', kind='string', value='true'),
'emails': dict(name='emails', kind='json', value='["a@a.com"]'),
}
rd = RoleDeserializer(
{
@ -254,17 +254,10 @@ def test_role_deserializer_with_attributes(db):
ImportContext(),
)
role, status = rd.deserialize()
created, dummy = rd.attributes()
rd.attributes()
assert status == 'created'
assert role.attributes.count() == 2
assert len(created) == 2
for attr in created:
attr_dict = attributes_data[attr.name]
assert attr_dict['name'] == attr.name
assert attr_dict['kind'] == attr.kind
assert attr_dict['value'] == attr.value
del attributes_data[attr.name]
assert role.is_superuser is True
assert role.emails == ['a@a.com']
def test_role_deserializer_creates_admin_role(db):

View File

@ -173,9 +173,8 @@ class SamlSP:
# Admin role
self.admin_role = Role.objects.create(
name='Administrator', slug='administrator', service=self.provider
name='Administrator', slug='administrator', service=self.provider, is_superuser=True
)
self.admin_role.attributes.create(name='superuser', kind='string', value='true')
# SAML attributes mapping
self.saml_first_name_attribute = self.provider.attributes.create(
@ -957,14 +956,8 @@ def test_add_attributes_user_ou1_role_ou2(add_attributes_all, user_ou1, role_ou2
service_role = Role.objects.create(
name='Role of service', slug='role-of-service', ou=ou1, service=add_attributes_all.provider
)
service_role.attributes.create(name='is_admin', kind='string', value='true')
user_ou1.roles.add(service_role)
add_attributes_all.get_definitions.return_value.append(
SAMLAttribute(name_format='basic', name='is_admin', attribute_name='is_admin'),
)
attributes = add_attributes_all(user_ou1)
assert attributes == {
'a2_role_names': {'Role of service', 'role_ou2'},
@ -999,7 +992,6 @@ def test_add_attributes_user_ou1_role_ou2(add_attributes_all, user_ou1, role_ou2
'django_user_password': {'abba0b6ff456806bab66baed93e6d9c4'},
'django_user_username': {'john.doe'},
'django_user_uuid': {user_ou1.uuid},
'is_admin': {'true'},
}

View File

@ -163,20 +163,27 @@ def test_manager_create_role(superuser_or_admin, app):
def test_manager_edit_role(superuser_or_admin, app, simple_role):
resp = login(app, superuser_or_admin, '/manage/roles/%s/edit/' % simple_role.pk)
resp.form['details'] = 'xxx'
resp.form['emails'] = 'test@example.com'
resp.form['emails_to_members'] = False
resp = resp.form.submit().follow()
assert set(simple_role.attributes.values_list('name', 'value')) == {
('emails_to_members', 'false'),
('emails', '["test@example.com"]'),
('details', '"xxx"'),
}
simple_role.refresh_from_db()
assert simple_role.details == 'xxx'
assert simple_role.emails == []
assert simple_role.emails_to_members is False
resp = app.get('/manage/roles/%s/edit/' % simple_role.pk)
resp.form['emails'] = 'test@example.com'
resp = resp.form.submit().follow()
simple_role.refresh_from_db()
assert simple_role.emails == ['test@example.com']
resp = app.get('/manage/roles/%s/edit/' % simple_role.pk)
resp.form['emails'] = 'test@example.com, hop@example.com'
resp = resp.form.submit().follow()
emails = simple_role.attributes.get(name='emails')
assert set(json.loads(emails.value)) == {'test@example.com', 'hop@example.com'}
simple_role.refresh_from_db()
assert set(simple_role.emails) == {'test@example.com', 'hop@example.com'}
resp = app.get('/manage/roles/%s/edit/' % simple_role.pk)
resp.form['emails'] = 'xxx'