manager: add child roles in role members view (#59664)

This commit is contained in:
Valentin Deniaud 2022-01-04 16:00:24 +01:00
parent bd437ddafb
commit 84bc4a1a56
10 changed files with 382 additions and 65 deletions

View File

@ -26,9 +26,11 @@ from django import forms
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ValidationError
from django.urls import reverse
from django.utils.text import slugify
from django.utils.translation import pgettext, ugettext
from django.utils.translation import ugettext_lazy as _
from django_select2.forms import HeavySelect2Widget
from authentic2.a2_rbac.models import OrganizationalUnit, Permission, Role
from authentic2.a2_rbac.utils import generate_slug, get_default_ou
@ -855,3 +857,55 @@ class RolesCsvImportForm(LimitQuerysetFormMixin, forms.Form):
def add_line_error(self, error, line):
error = _('%(error)s (line %(number)d)') % {'error': error, 'number': line + 1}
self.add_error('import_file', error)
class HeavySelect2WidgetNoCache(HeavySelect2Widget):
def set_to_cache(self):
pass
class ChooseUserOrRoleForm(FormWithRequest, forms.Form):
user_or_role = forms.CharField(label=_('Add to role'))
action = forms.CharField(initial='add', widget=forms.HiddenInput)
def __init__(self, *args, **kwargs):
self.role = kwargs.pop('role', None)
super().__init__(*args, **kwargs)
self.fields['user_or_role'].widget = HeavySelect2WidgetNoCache(
data_url=reverse('user-or-role-select2-json', kwargs={'pk': self.role.pk})
)
def clean(self):
super().clean()
try:
object_type, pk = self.cleaned_data.get('user_or_role', '').split('-')
pk = int(pk)
except (ValueError, TypeError):
return
if object_type == 'user':
try:
self.cleaned_data['user'] = self.get_user_queryset(self.request.user, self.role).get(pk=pk)
except User.DoesNotExist:
return
elif object_type == 'role':
try:
self.cleaned_data['role'] = self.get_role_queryset(self.request.user, self.role).get(pk=pk)
except Role.DoesNotExist:
return
@staticmethod
def get_role_queryset(user, role):
qs = Role.objects.exclude(pk=role.pk)
perm = '%s.search_%s' % (Role._meta.app_label, Role._meta.model_name)
return user.filter_by_perm(perm, qs)
@staticmethod
def get_user_queryset(user, role):
qs = User.objects.all()
if app_settings.ROLE_MEMBERS_FROM_OU and role.ou:
qs = qs.filter(ou=role.ou)
perm = '%s.search_%s' % (User._meta.app_label, User._meta.model_name)
return user.filter_by_perm(perm, qs)

View File

@ -15,19 +15,23 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
from functools import reduce
from django.contrib import messages
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.core import signing
from django.core.exceptions import PermissionDenied, ValidationError
from django.core.paginator import EmptyPage, Paginator
from django.db import transaction
from django.db.models import BooleanField, Count, ExpressionWrapper, F, Prefetch, Q
from django.db.models import BooleanField, Count, ExpressionWrapper, F, Prefetch, Q, Value
from django.db.models.functions import Cast
from django.http import Http404, JsonResponse
from django.shortcuts import get_object_or_404
from django.urls import reverse
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
from django.views.generic import FormView, TemplateView
from django.views.generic import DetailView, FormView, TemplateView
from django.views.generic.detail import SingleObjectMixin
from authentic2 import data_transfer, hooks
@ -37,9 +41,11 @@ from authentic2.apps.journal.views import JournalViewWithContext
from authentic2.forms.profile import modelform_factory
from authentic2.utils.misc import redirect
from . import app_settings, forms, resources, tables, views
from . import forms, resources, tables, views
from .journal_views import BaseJournalView
from .utils import has_show_username
from .utils import get_ou_count, has_show_username, label_from_user
User = get_user_model()
class RolesMixin:
@ -168,7 +174,7 @@ edit = RoleEditView.as_view()
class RoleMembersView(views.HideOUColumnMixin, RoleViewMixin, views.BaseSubTableView):
template_name = 'authentic2/manager/role_members.html'
form_class = forms.ChooseUserForm
form_class = forms.ChooseUserOrRoleForm
success_url = '.'
search_form_class = forms.RoleMembersSearchForm
permissions = ['a2_rbac.view_role']
@ -192,13 +198,16 @@ class RoleMembersView(views.HideOUColumnMixin, RoleViewMixin, views.BaseSubTable
self._can_manage_members = value
def dispatch(self, request, *args, **kwargs):
self.children = views.filter_view(self.request, self.get_object().children(include_self=False))
self.children = views.filter_view(
self.request, self.get_object().children(include_self=False, annotate=True)
)
return super().dispatch(request, *args, **kwargs)
def get_table_data(self):
if self.view_all_members:
return super().get_table_data()
members = views.filter_view(self.request, self.object.members.all())
members = members.annotate(direct=Value(True, output_field=BooleanField()))
members = self.filter_by_search(members)
return list(self.children) + list(members)
@ -211,48 +220,62 @@ class RoleMembersView(views.HideOUColumnMixin, RoleViewMixin, views.BaseSubTable
return self.search_form.is_valid() and self.search_form.cleaned_data.get('all_members')
def form_valid(self, form):
user = form.cleaned_data['user']
action = form.cleaned_data['action']
if self.can_manage_members:
if action == 'add':
if self.object.members.filter(pk=user.pk).exists():
messages.warning(self.request, _('User already in this role.'))
else:
self.object.members.add(user)
hooks.call_hooks(
'event',
name='manager-add-role-member',
user=self.request.user,
role=self.object,
member=user,
)
self.request.journal.record(
'manager.role.membership.grant', role=self.object, member=user
)
elif action == 'remove':
if not self.object.members.filter(pk=user.pk).exists():
messages.warning(self.request, _('User was not in this role.'))
else:
self.object.members.remove(user)
hooks.call_hooks(
'event',
name='manager-remove-role-member',
user=self.request.user,
role=self.object,
member=user,
)
self.request.journal.record(
'manager.role.membership.removal', role=self.object, member=user
)
else:
if not self.can_manage_members:
messages.warning(self.request, _('You are not authorized'))
elif 'user' in form.cleaned_data:
if action == 'add':
self.add_user(form.cleaned_data['user'])
elif action == 'remove':
self.remove_user(form.cleaned_data['user'])
elif 'role' in form.cleaned_data:
if action == 'add':
self.add_role(form.cleaned_data['role'])
elif action == 'remove':
self.remove_role(form.cleaned_data['role'])
return super().form_valid(form)
def add_user(self, user):
if self.object.members.filter(pk=user.pk).exists():
messages.warning(self.request, _('User already in this role.'))
else:
self.object.members.add(user)
hooks.call_hooks(
'event', name='manager-add-role-member', user=self.request.user, role=self.object, member=user
)
self.request.journal.record('manager.role.membership.grant', role=self.object, member=user)
def remove_user(self, user):
if not self.object.members.filter(pk=user.pk).exists():
messages.warning(self.request, _('User was not in this role.'))
else:
self.object.members.remove(user)
hooks.call_hooks(
'event',
name='manager-remove-role-member',
user=self.request.user,
role=self.object,
member=user,
)
self.request.journal.record('manager.role.membership.removal', role=self.object, member=user)
def add_role(self, role):
self.object.add_child(role)
hooks.call_hooks(
'event', name='manager-add-child-role', user=self.request.user, parent=self.object, child=role
)
self.request.journal.record('manager.role.inheritance.addition', parent=self.object, child=role)
def remove_role(self, role):
self.object.remove_child(role)
hooks.call_hooks(
'event', name='manager-remove-child-role', user=self.request.user, parent=self.object, child=role
)
self.request.journal.record('manager.role.inheritance.removal', parent=self.object, child=role)
def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
# if role's members can only be from the same OU we filter user based on the role's OU
if app_settings.ROLE_MEMBERS_FROM_OU:
kwargs['ou'] = self.object.ou
kwargs['role'] = self.object
return kwargs
def get_search_form_kwargs(self):
@ -783,3 +806,85 @@ class RolesJournal(views.SearchOUMixin, views.PermissionMixin, JournalViewWithCo
roles_journal = RolesJournal.as_view()
class UserOrRoleSelect2View(DetailView):
form_class = forms.ChooseUserOrRoleForm
model = Role
def get(self, request, *args, **kwargs):
if not self.request.user.is_authenticated or not hasattr(self.request.user, 'filter_by_perm'):
raise Http404('Invalid user')
role = self.get_object()
field_id = self.kwargs.get('field_id', self.request.GET.get('field_id', None))
try:
signing.loads(field_id)
except (signing.SignatureExpired, signing.BadSignature):
raise Http404('Invalid or expired signature.')
search_term = request.GET.get('term', '')
try:
page_number = int(request.GET.get('page', 1))
except ValueError:
page_number = 1
role_qs = self.form_class.get_role_queryset(self.request.user, role)
children = role.children(annotate=True)
children = children.annotate(is_direct=Cast('direct', output_field=BooleanField()))
role_qs = role_qs.exclude(pk__in=children.filter(is_direct=True))
role_qs = self.filter_queryset(role_qs, search_term, ['name', 'service__name', 'ou__name'])
role_paginator = Paginator(role_qs, 10)
try:
role_page = role_paginator.page(page_number)
except EmptyPage:
role_page = []
else:
has_next = role_page.has_next()
user_page = []
if len(role_page) < 10:
user_qs = self.form_class.get_user_queryset(self.request.user, role)
user_qs = user_qs.exclude(roles=role)
user_qs = self.filter_queryset(
user_qs, search_term, ['username', 'first_name', 'last_name', 'email']
)
page_number = page_number - role_paginator.num_pages + 1
user_paginator = Paginator(user_qs, 10)
try:
user_page = user_paginator.page(page_number)
except EmptyPage:
has_next = False
else:
has_next = user_page.has_next()
return JsonResponse(
{
"results": [self.get_choice(obj) for obj in list(role_page) + list(user_page)],
"more": has_next,
}
)
@staticmethod
def filter_queryset(qs, search_term, search_fields):
lookups = Q()
for term in [term for term in search_term.split() if not term == '']:
lookups &= reduce(Q.__or__, (Q(**{'%s__icontains' % field: term}) for field in search_fields))
return qs.filter(lookups)
def get_choice(self, obj):
if isinstance(obj, Role):
text = str(obj)
if obj.ou and get_ou_count() > 1:
text = f'{obj.ou} - {obj}'
key = 'role-%s'
elif isinstance(obj, User):
text = label_from_user(obj)
key = 'user-%s'
return {'id': key % obj.pk, 'text': text}
user_or_role_select2 = UserOrRoleSelect2View.as_view()

View File

@ -93,7 +93,7 @@ class RoleMembersTable(UserTable):
)
class Meta(UserTable.Meta):
pass
row_attrs = {"data-pk": lambda record: 'user-%s' % record.pk}
class UserOrRoleColumn(UserLinkColumn):
@ -113,6 +113,9 @@ class MixedUserRoleTable(Table):
class Meta(Table.Meta):
attrs = {'class': 'main', 'id': 'user-table'}
row_attrs = {
"data-pk": lambda record: '%s-%s' % ('user' if isinstance(record, User) else 'role', record.pk)
}
class RoleTable(Table):

View File

@ -70,7 +70,7 @@
{% endif %}
{% if view.can_manage_members %}
<form method="post" class="manager-m2m-add-form" id="add-user">
<form method="post" class="manager-m2m-add-form" id="add-member">
{% csrf_token %}
{{ form }}
<button>{% trans "Add" %}</button>

View File

@ -6,5 +6,5 @@
<th></th>
{% endblock %}
{% block table.tbody.last.column %}
<td class="remove-icon-column">{% if table.context.view.can_manage_members and row.record.direct %}<a class="js-remove-object" data-confirm="{% blocktrans with user=row.record role=table.context.object %}Do you really want to remove user &quot;{{ user }}&quot; from role &quot;{{ role }}&quot;?{% endblocktrans %}" href="#" data-pk-arg="user"><span class="icon-remove-sign"></span></a>{% endif %}</td>
<td class="remove-icon-column">{% if table.context.view.can_manage_members and row.record.direct %}<a class="js-remove-object" data-confirm="{% blocktrans with record=row.record role=table.context.object %}Do you really want to remove &quot;{{ record }}&quot; from role &quot;{{ role }}&quot;?{% endblocktrans %}" href="#" data-pk-arg="user_or_role"><span class="icon-remove-sign"></span></a>{% endif %}</td>
{% endblock %}

View File

@ -157,6 +157,11 @@ urlpatterns = required(
url(r'^roles/(?P<pk>\d+)/edit/$', role_views.edit, name='a2-manager-role-edit'),
url(r'^roles/(?P<pk>\d+)/permissions/$', role_views.permissions, name='a2-manager-role-permissions'),
url(r'^roles/(?P<pk>\d+)/journal/$', role_views.journal, name='a2-manager-role-journal'),
url(
r'^roles/(?P<pk>\d+)/user-or-role-select2.json$',
role_views.user_or_role_select2,
name='user-or-role-select2-json',
),
# Authentic2 organizational units
url(r'^organizational-units/$', ou_views.listing, name='a2-manager-ous'),
url(r'^organizational-units/add/$', ou_views.add, name='a2-manager-ou-add'),

View File

@ -402,7 +402,8 @@ def test_admin_role_user_view(db, settings, app, admin, simple_user, ou1, user_o
response = response.click('role_ou1')
select2_json = request_select2(app, response)
assert select2_json['more'] is False
assert {result['id'] for result in select2_json['results']} == {simple_user.id, user_ou1.id, admin.id}
user_ids = {int(x['id'].split('-')[1]) for x in select2_json['results'] if x['id'].startswith('user')}
assert user_ids == {simple_user.id, user_ou1.id, admin.id}
# with A2_RBAC_ROLE_ADMIN_RESTRICT_TO_OU_USERS after a reload of the admin
# page, we should only see user from the same OU as the role
@ -412,7 +413,8 @@ def test_admin_role_user_view(db, settings, app, admin, simple_user, ou1, user_o
response = response.click('role_ou1')
select2_json = request_select2(app, response)
assert select2_json['more'] is False
assert {result['id'] for result in select2_json['results']} == {user_ou1.id}
user_ids = {int(x['id'].split('-')[1]) for x in select2_json['results'] if x['id'].startswith('user')}
assert user_ids == {user_ou1.id}
def test_no_managed_ct(transactional_db, settings):

View File

@ -207,16 +207,15 @@ def test_role_members_from_ou(app, superuser, simple_user, settings):
url = reverse('a2-manager-role-members', kwargs={'pk': r.pk})
response = login(app, superuser, url)
assert not response.context['form'].fields['user'].queryset.query.where
select2_json = request_select2(app, response)
assert len(select2_json['results']) == 2
select2_json = request_select2(app, response, fetch_all=True)
assert len([x for x in select2_json['results'] if x['id'].startswith('user')]) == 2
settings.A2_MANAGER_ROLE_MEMBERS_FROM_OU = True
response = app.get(url)
assert response.context['form'].fields['user'].queryset.query.where
select2_json = request_select2(app, response)
assert len(select2_json['results']) == 1
assert select2_json['results'][0]['id'] == simple_user.pk
select2_json = request_select2(app, response, fetch_all=True)
user_choices = [x for x in select2_json['results'] if x['id'].startswith('user')]
assert len(user_choices) == 1
assert user_choices[0]['id'] == 'user-%s' % simple_user.pk
def test_manager_create_user(superuser_or_admin, app, settings):
@ -937,8 +936,8 @@ def test_manager_role_admin_permissions(app, simple_user, admin, simple_role):
# user can add members
response = app.get('/manage/roles/%s/' % simple_role.pk)
form = response.forms['add-user']
form['user'].force_value(admin.pk)
form = response.forms['add-member']
form['user_or_role'].force_value('user-%s' % admin.pk)
response = form.submit().follow()
assert simple_role in admin.roles.all()
@ -946,7 +945,7 @@ def test_manager_role_admin_permissions(app, simple_user, admin, simple_role):
q = response.pyquery.remove_namespaces()
assert q('table tbody tr td .icon-remove-sign')
token = str(response.context['csrf_token'])
params = {'action': 'remove', 'user': admin.pk, 'csrfmiddlewaretoken': token}
params = {'action': 'remove', 'user_or_role': 'user-%s' % admin.pk, 'csrfmiddlewaretoken': token}
app.post('/manage/roles/%s/' % simple_role.pk, params=params)
assert simple_role not in admin.roles.all()
@ -979,6 +978,21 @@ def test_manager_role_admin_permissions(app, simple_user, admin, simple_role):
response = app.post('/manage/roles/%s/parents/' % role.pk, params=params)
assert simple_role not in role.parents()
# user can add role as a member through role members form
response = app.get('/manage/roles/%s/' % simple_role.pk)
form = response.forms['add-member']
form['user_or_role'].force_value('role-%s' % role.pk)
response = form.submit().follow()
assert role in simple_role.children()
# user can delete role members
q = response.pyquery.remove_namespaces()
assert q('table tbody tr td .icon-remove-sign')
token = str(response.context['csrf_token'])
params = {'action': 'remove', 'user_or_role': 'role-%s' % role.pk, 'csrfmiddlewaretoken': token}
app.post('/manage/roles/%s/' % simple_role.pk, params=params)
assert role not in simple_role.children()
# try to add arbitrary role
admin_role = Role.objects.get(slug='_a2-manager')
response = app.get('/manage/roles/%s/parents/' % role.pk)

View File

@ -24,7 +24,7 @@ from authentic2.a2_rbac.models import OrganizationalUnit, Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.custom_user.models import User
from .utils import login, text_content
from .utils import login, request_select2, text_content
def test_manager_role_export(app, admin, ou1, role_ou1, ou2, role_ou2):
@ -475,3 +475,129 @@ def test_role_members_user_role_mixed_table(app, superuser, settings, simple_rol
resp = resp.click('Add a role as a member')
assert 'Role a' in resp.text
# add child role to child
grandchild = Role.objects.create(name='grandchild')
role.add_child(grandchild)
resp = app.get(url)
rows = [text_content(el) for el in resp.pyquery('tr td.name')]
assert rows == ['Members of role Role a', 'Members of role grandchild', 'Jôhn Dôe']
# remove icon is not shown for indirect child
assert len(resp.pyquery('tr td a.js-remove-object')) == 2
def test_role_members_user_role_mixed_field_choices(
app, superuser, settings, simple_role, simple_user, role_ou1
):
url = reverse('a2-manager-role-members', kwargs={'pk': simple_role.pk})
resp = login(app, superuser, url)
select2_json = request_select2(app, resp)
assert len(select2_json['results']) == 10
assert select2_json['more'] is True
select2_json = request_select2(app, resp, fetch_all=True)
assert len(select2_json['results']) == 17
choices = [x['text'] for x in select2_json['results']]
assert choices == [
'Default organizational unit - Managers of role "simple role"',
'Default organizational unit - Roles - Default organizational unit',
'Default organizational unit - Services - Default organizational unit',
'Default organizational unit - Users - Default organizational unit',
'OU1 - role_ou1',
'OU1 - Roles - OU1',
'OU1 - Services - OU1',
'OU1 - Users - OU1',
'Manager',
'Manager of organizational units',
'Manager of roles',
'Manager of services',
'Manager of users',
'Managers of "Default organizational unit"',
'Managers of "OU1"',
'Jôhn Dôe - user@example.net - user',
'super user - superuser@example.net - superuser',
]
select2_json = request_select2(app, resp, term='user')
choices = [x['text'] for x in select2_json['results']]
assert choices == [
'Default organizational unit - Users - Default organizational unit',
'OU1 - Users - OU1',
'Manager of users',
'Jôhn Dôe - user@example.net - user',
'super user - superuser@example.net - superuser',
]
assert select2_json['more'] is False
select2_json = request_select2(app, resp, term='Manager')
assert len(select2_json['results']) == 8
select2_json = request_select2(app, resp, term='Manager of')
assert len(select2_json['results']) == 7
select2_json = request_select2(app, resp, term='Manager of serv')
assert len(select2_json['results']) == 1
for i in range(25):
Role.objects.create(name=f'test_role_{i}')
select2_json = request_select2(app, resp, term='test_role_', fetch_all=True)
assert len(select2_json['results']) == 25
for i in range(25):
User.objects.create(username=f'test_user_{i}')
select2_json = request_select2(app, resp, term='test_user_', fetch_all=True)
assert len(select2_json['results']) == 25
def test_role_members_user_role_add_remove(app, superuser, settings, simple_role, simple_user, role_ou1):
url = reverse('a2-manager-role-members', kwargs={'pk': simple_role.pk})
resp = login(app, superuser, url)
select2_json = request_select2(app, resp, term='Jôhn')
assert len(select2_json['results']) == 1
form = resp.forms['add-member']
form['user_or_role'].force_value(select2_json['results'][0]['id'])
resp = form.submit().follow()
assert 'Jôhn Dôe' in resp.text
select2_json = request_select2(app, resp, term='Jôhn')
assert len(select2_json['results']) == 0
data_pks = [row.attrib['data-pk'] for row in resp.pyquery('table tbody tr')]
assert data_pks == ['user-%s' % simple_user.pk]
data_pk_args = [row.attrib['data-pk-arg'] for row in resp.pyquery('table tbody tr td a.js-remove-object')]
assert data_pk_args == ['user_or_role']
select2_json = request_select2(app, resp, term='role_ou1')
assert len(select2_json['results']) == 1
form = resp.forms['add-member']
form['user_or_role'].force_value(select2_json['results'][0]['id'])
resp = form.submit().follow()
assert 'role_ou1' in resp.text
select2_json = request_select2(app, resp, term='role_ou1')
assert len(select2_json['results']) == 0
data_pks = [row.attrib['data-pk'] for row in resp.pyquery('table tbody tr')]
assert data_pks == ['role-%s' % role_ou1.pk, 'user-%s' % simple_user.pk]
data_pk_args = [row.attrib['data-pk-arg'] for row in resp.pyquery('table tbody tr td a.js-remove-object')]
assert data_pk_args == ['user_or_role', 'user_or_role']
# simulate click on Jôhn Dôe delete icon
token = str(resp.context['csrf_token'])
params = {'action': 'remove', 'user_or_role': 'user-%s' % simple_user.pk, 'csrfmiddlewaretoken': token}
resp = app.post('/manage/roles/%s/' % simple_role.pk, params=params).follow()
assert 'Jôhn Dôe' not in resp.text
# simulate click on role_ou1 delete icon
token = str(resp.context['csrf_token'])
params = {'action': 'remove', 'user_or_role': 'role-%s' % role_ou1.pk, 'csrfmiddlewaretoken': token}
resp = app.post('/manage/roles/%s/' % simple_role.pk, params=params).follow()
assert 'role_ou1' not in resp.text
# invalid choices are ignored
for invalid_choice in ('', 'wrong-wrong', 'user-', 'user-xxx', 'role', 'user-99999'):
form = resp.forms['add-member']
form['user_or_role'].force_value(invalid_choice)
resp = form.submit().maybe_follow()

View File

@ -228,17 +228,25 @@ def find_free_tcp_port():
return s.getsockname()[1]
def request_select2(app, response, term='', get_kwargs=None):
def request_select2(app, response, term='', fetch_all=False, page=1, get_kwargs=None):
select2_url = response.pyquery('select')[0].attrib['data-ajax--url']
select2_field_id = response.pyquery('select')[0].attrib['data-field_id']
select2_response = app.get(
select2_url, params={'field_id': select2_field_id, 'term': term}, **(get_kwargs or {})
)
if select2_response['content-type'] == 'application/json':
return select2_response.json
else:
params = {'field_id': select2_field_id, 'term': term}
if page:
params['page'] = page
select2_response = app.get(select2_url, params=params, **(get_kwargs or {}))
if select2_response['content-type'] != 'application/json':
return select2_response
select2_json = select2_response.json
results = select2_json['results']
if fetch_all and select2_json['more']:
results.extend(request_select2(app, response, term, fetch_all, page + 1, get_kwargs)['results'])
return select2_json
@contextmanager
def run_on_commit_hooks():