From 868ac7837716430d00618867fc8864c67aac6af1 Mon Sep 17 00:00:00 2001 From: Benjamin Dauvergne Date: Fri, 13 Sep 2019 18:09:09 +0200 Subject: [PATCH] Customize admin roles (#35737) --- .gitignore | 1 + src/authentic2_wallonie_connect/apps.py | 4 + src/authentic2_wallonie_connect/roles.py | 117 ++++++++++++++ tests/settings.py | 5 + tests/test_acl.py | 194 +++++++++++++++++++++++ tests/utils.py | 45 ++++++ 6 files changed, 366 insertions(+) create mode 100644 src/authentic2_wallonie_connect/roles.py create mode 100644 tests/test_acl.py create mode 100644 tests/utils.py diff --git a/.gitignore b/.gitignore index 9cbecda..5401b52 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ pip-selfcheck.json src/authentic2_wallonie_connect.egg-info/ test_py27-dj111-pg-coverage_results.xml *.pyc +dist diff --git a/src/authentic2_wallonie_connect/apps.py b/src/authentic2_wallonie_connect/apps.py index 0f34e48..40f9a91 100644 --- a/src/authentic2_wallonie_connect/apps.py +++ b/src/authentic2_wallonie_connect/apps.py @@ -19,3 +19,7 @@ import django.apps class AppConfig(django.apps.AppConfig): name = 'authentic2_wallonie_connect' + + def ready(self): + from . import roles + roles.init(self) diff --git a/src/authentic2_wallonie_connect/roles.py b/src/authentic2_wallonie_connect/roles.py new file mode 100644 index 0000000..32666d5 --- /dev/null +++ b/src/authentic2_wallonie_connect/roles.py @@ -0,0 +1,117 @@ +# coding: utf-8 +# +# authentic2-wallonie-connect - Authentic2 plugin for the Wallonie Connect usecase +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals + +from django.apps import apps +from django.db import router +from django.db.models.signals import post_save, post_migrate +from django.contrib.contenttypes.models import ContentType + +from django_rbac.utils import get_operation +from django_rbac.models import ( + ADMIN_OP, + CHANGE_OP, + ADD_OP, +) + +from authentic2.custom_user.models import User +from authentic2.a2_rbac.models import ( + OrganizationalUnit as OU, + Role, + Permission, + RESET_PASSWORD_OP, + ACTIVATE_OP, + CHANGE_EMAIL_OP, +) +from authentic2.models import Service + + +def init(app_config): + post_migrate.connect(post_migrate_handler, sender=apps.get_app_config('a2_rbac')) + post_save.connect(ou_post_save_handler, sender=OU) + + +def post_migrate_handler(using, **kwargs): + if not router.allow_migrate(using, Role): + return + create_base_roles() + + +def create_base_roles(): + admin, created = Role.objects.get_or_create( + slug='_imio-wc-admin', + ou=None, + defaults={ + 'name': 'Administrateur', + }) + set_permissions(admin, [ + (User, None, [ADMIN_OP]), + (Role, None, [ADMIN_OP]), + (Service, None, [ADMIN_OP]), + (OU, None, [ADMIN_OP]), + ]) + + +def ou_post_save_handler(instance, **kwargs): + update_ou_roles(instance) + + +def update_ous_roles(): + for ou in OU.objects.all(): + update_ou_roles(ou) + + +def update_ou_roles(ou): + if not ou.username_is_unique: + ou.username_is_unique = True + ou.save() + # Create admin of users + user_admin, created = Role.objects.get_or_create( + slug='_imio-wc-user-admin', + ou=ou, + defaults={ + 'name': 'Administrateur des utilisateurs', + }) + set_permissions(user_admin, [(User, ou, [ + ADD_OP, CHANGE_OP, RESET_PASSWORD_OP, + ACTIVATE_OP, CHANGE_EMAIL_OP])]) + + # Create admin of roles + role_admin, created = Role.objects.get_or_create( + slug='_imio-wc-role-admin', + ou=ou, + defaults={ + 'name': 'Administrateur des rôles', + }) + set_permissions(role_admin, [(Role, ou, [CHANGE_OP])]) + + +def set_permissions(role, model_scope_ops): + permissions = set() + for model, scope, operations in model_scope_ops: + for operation in operations: + permission, created = Permission.objects.get_or_create( + operation=get_operation(operation), + ou=scope, + target_ct=ContentType.objects.get_for_model(ContentType), + target_id=ContentType.objects.get_for_model(model).pk) + permissions.add(permission) + if set(role.permissions.all()) != permissions: + role.permissions = permissions + diff --git a/tests/settings.py b/tests/settings.py index 2e48c94..9773ae4 100644 --- a/tests/settings.py +++ b/tests/settings.py @@ -9,3 +9,8 @@ DATABASES = { }, } } + +A2_RBAC_MANAGED_CONTENT_TYPES = () +A2_RBAC_ROLE_ADMIN_RESTRICT_TO_OU_USERS = True + +INSTALLED_APPS += ('authentic2_wallonie_connect',) diff --git a/tests/test_acl.py b/tests/test_acl.py new file mode 100644 index 0000000..3bcccaa --- /dev/null +++ b/tests/test_acl.py @@ -0,0 +1,194 @@ +# coding: utf-8 +# +# authentic2-wallonie-connect - Authentic2 plugin for the Wallonie Connect usecase +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals + +import pytest + +from authentic2.custom_user.models import User +from authentic2.a2_rbac.models import OrganizationalUnit as OU, Role +from authentic2.models import Service + +from utils import login + + +@pytest.fixture(autouse=True) +def base(db): + class Base(object): + pass + + b = Base() + + OU.objects.filter(default=True).update(name='IMIO', slug='imio') + b.ou_imio = OU.objects.get(slug='imio') + b.role_global_admin = Role.objects.get(slug='_imio-wc-admin') + b.user_admin = User.objects.create(ou=b.ou_imio, username='admin') + b.user_admin.roles.add(b.role_global_admin) + + b.ou_liege = OU.objects.create(name='Liège', slug='liege') + b.role_liege_user_admin = Role.objects.get(slug='_imio-wc-user-admin', ou=b.ou_liege) + b.role_liege_role_admin = Role.objects.get(slug='_imio-wc-role-admin', ou=b.ou_liege) + b.service_liege = Service.objects.create(ou=b.ou_liege, slug='service', name='Service') + b.role_service_liege = Role.objects.create(ou=b.ou_liege, slug='service', name='Accès Service') + b.service_liege.authorized_roles.through.objects.create(service=b.service_liege, role=b.role_service_liege) + b.user_liege = User.objects.create(ou=b.ou_liege, username='user-liege') + b.user_liege_admin = User.objects.create(ou=b.ou_liege, username='admin-liege') + b.user_liege_admin.roles.add(b.role_liege_user_admin) + b.user_liege_admin.roles.add(b.role_liege_role_admin) + b.user_liege_service_admin = User.objects.create(ou=b.ou_liege, username='service-admin-liege') + b.user_liege_service_admin.roles.add(b.role_service_liege.get_admin_role()) + + b.ou_tournay = OU.objects.create(name='Tournay', slug='tournay') + b.role_tournay_user_admin = Role.objects.get(slug='_imio-wc-user-admin', ou=b.ou_tournay) + b.role_tournay_role_admin = Role.objects.get(slug='_imio-wc-role-admin', ou=b.ou_tournay) + b.user_tournay = User.objects.create(ou=b.ou_tournay, username='user-tournay') + + # set all passwords + for user in User.objects.all(): + user.set_password(user.username) + user.save() + + return b + + +def test_imio_admin(app, base): + home = login(app, base.user_admin, '/manage/') + assert (set(elt.attrib['href'].strip('/').split('/')[1] for elt in home.pyquery('.apps li a')) + == set(['organizational-units', + 'roles', + 'users', + 'services'])) + + users = home.click('Users') + assert len(users.pyquery('table tbody tr')) == User.objects.count() + + # Can add user in IMIO + users_imio = app.get('/manage/users/?search-ou=%s' % base.ou_imio.pk) + assert len(users_imio.pyquery('table tbody tr')) == 1 + users_imio.click('Add user') + + # Can add user in Liège + users_liege = app.get('/manage/users/?search-ou=%s' % base.ou_liege.pk) + assert len(users_liege.pyquery('table tbody tr')) == 3 + users_liege.click('Add user') + + # Can edit user in IMIO + user_imio = app.get('/manage/users/%s/' % base.user_admin.pk) + assert (set(elt.text for elt in user_imio.pyquery('.actions a:not(.disabled)')) + == set(['Delete', 'Edit'])) + assert (set(elt.text for elt in user_imio.pyquery('.other_actions button')) + == set(['Suspend', + 'Reset password', + 'Modify roles', + 'Change user password', + 'Force password change on next login'])) + + # or Liège + user_liege = app.get('/manage/users/%s/' % base.user_liege_admin.pk) + assert (set(elt.text for elt in user_liege.pyquery('.actions a:not(.disabled)')) + == set(['Delete', 'Edit'])) + assert (set(elt.text for elt in user_liege.pyquery('.other_actions button')) + == set(['Suspend', + 'Reset password', + 'Modify roles', + 'Change user password', + 'Force password change on next login'])) + + +def test_liege_admin(app, base): + home = login(app, base.user_liege_admin, '/manage/') + assert (set(elt.attrib['href'].strip('/').split('/')[1] for elt in home.pyquery('.apps li a')) + == set(['roles', 'users'])) + + users = home.click('Users') + assert len(users.pyquery('table tbody tr')) == User.objects.filter(ou=base.ou_liege).count() + + # Can add user directly + users.click('Add user') + + # Cannot see user in IMIO or Tournay + app.get('/manage/users/%s/' % base.user_admin.pk, status=403) + app.get('/manage/users/%s/' % base.user_tournay.pk, status=403) + + # Cane edit but not delete user in Liège + user_liege = app.get('/manage/users/%s/' % base.user_liege_admin.pk) + assert (set(elt.text for elt in user_liege.pyquery('.actions a:not(.disabled)')) + == set(['Edit'])) + assert (set(elt.text for elt in user_liege.pyquery('.other_actions button')) + == set(['Suspend', + 'Reset password', + 'Modify roles', + 'Force password change on next login'])) + + +def test_liege_service_admin(app, base): + home = login(app, base.user_liege_service_admin, '/manage/') + assert (set(elt.attrib['href'].strip('/').split('/')[1] for elt in home.pyquery('.apps li a')) + == set(['roles', 'users'])) + + users = home.click('Users') + assert len(users.pyquery('table tbody tr')) == User.objects.filter(ou=base.ou_liege).count() + + # Cannot add user + assert len(users.pyquery('#add-user-btn.disabled')) == 1 + + # Can see user in Liège + app.get('/manage/users/%s/edit/' % base.user_liege.pk, status=403) + # Cannot see in IMIO or Tournay + app.get('/manage/users/%s/' % base.user_admin.pk, status=403) + app.get('/manage/users/%s/' % base.user_tournay.pk, status=403) + + # Cane edit but not delete user in Liège but modify roles + user_liege = app.get('/manage/users/%s/' % base.user_liege.pk) + # Can't do anthing on a user of Liège + assert (set(elt.text for elt in user_liege.pyquery('.actions a:not(.disabled)')) + == set([])) + assert (set(elt.text for elt in user_liege.pyquery('.other_actions button')) + == set(['Modify roles'])) + + # but modify members of the service role + modify_roles = app.get('/manage/users/%s/roles/' % base.user_liege.pk) + assert len(modify_roles.pyquery('table tbody tr td.name a')) == 1 + assert modify_roles.pyquery('table tbody tr td.name a')[0].text == 'Accès Service' + + assert base.user_liege.roles.count() == 0 + + modify_roles = app.post('/manage/users/%s/roles/' % base.user_liege.pk, + params={ + 'csrfmiddlewaretoken': app.cookies['csrftoken'], + 'action': 'add', + 'role': base.role_service_liege.pk + }) + + assert base.user_liege.roles.count() == 1 + assert base.user_liege.roles.get().name == 'Accès Service' + + roles = app.get('/manage/roles/') + assert len(roles.pyquery('table tbody tr')) == 1 + assert roles.pyquery('table tbody tr td.name a')[0].text == 'Accès Service' + + # Select3 only shows users from Liège + service_role = app.get('/manage/roles/%s/' % base.role_service_liege.pk) + select2_url = service_role.pyquery('select#id_user')[0].attrib['data-ajax--url'] + select2_field_id = service_role.pyquery('select#id_user')[0].attrib['data-field_id'] + + select2_response = app.get(select2_url, params={'field_id': select2_field_id, 'term': ''}) + assert select2_response.json['more'] is False + assert len(select2_response.json['results']) == 3 + ids = set(result['id'] for result in select2_response.json['results']) + assert User.objects.filter(id__in=ids, ou=base.ou_liege).count() == 3 diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..965c04b --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,45 @@ +# authentic2-wallonie-connect - Authentic2 plugin for the Wallonie Connect usecase +# Copyright (C) 2019 Entr'ouvert +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import unicode_literals + + +from django.core.urlresolvers import reverse +from django.shortcuts import resolve_url + + +def login(app, user, path=None, password=None, remember_me=None, args=None, kwargs=None): + if path: + args = args or [] + kwargs = kwargs or {} + path = resolve_url(path, *args, **kwargs) + login_page = app.get(path, status=302).maybe_follow() + else: + login_page = app.get(reverse('auth_login')) + assert login_page.request.path == reverse('auth_login') + form = login_page.form + form.set('username', user.username if hasattr(user, 'username') else user) + # password is supposed to be the same as username + form.set('password', password or user.username) + if remember_me is not None: + form.set('remember_me', bool(remember_me)) + response = form.submit(name='login-password-submit').follow() + if path: + assert response.request.path == path + else: + assert response.request.path == reverse('auth_homepage') + assert '_auth_user_id' in app.session + return response