api: make sync endpoint adapt to permissions by OU (#71506)
gitea/authentic/pipeline/head Build queued... Details

This commit is contained in:
Paul Marillonnet 2022-11-18 11:26:49 +01:00
parent cd2a644f94
commit d542d33af8
3 changed files with 86 additions and 2 deletions

View File

@ -772,6 +772,22 @@ class UsersAPI(api_mixins.GetOrCreateMixinView, HookMixin, ExceptionHandlerMixin
queryset = queryset.none()
return queryset
def filter_queryset_by_ou_perm(self, perm):
queryset = User.objects
allowed_ous = []
if self.request.user.has_perm(perm):
return queryset
for ou in OrganizationalUnit.objects.all():
if self.request.user.has_ou_perm(perm, ou):
allowed_ous.append(ou)
if not allowed_ous:
raise PermissionDenied("You do not have permission to perform this action.")
queryset = queryset.filter(ou__in=allowed_ous)
return queryset
def update(self, request, *args, **kwargs):
kwargs['partial'] = True
return super().update(request, *args, **kwargs)
@ -827,15 +843,17 @@ class UsersAPI(api_mixins.GetOrCreateMixinView, HookMixin, ExceptionHandlerMixin
modified_users_uuids.add(users_pks[instance_pk].uuid)
return modified_users_uuids
@action(detail=False, methods=['post'], permission_classes=(DjangoPermission('custom_user.search_user'),))
@action(detail=False, methods=['post'], permission_classes=(permissions.IsAuthenticated,))
def synchronization(self, request):
serializer = self.SynchronizationSerializer(data=request.data)
queryset = self.filter_queryset_by_ou_perm('custom_user.search_user')
if not serializer.is_valid():
response = {'result': 0, 'errors': serializer.errors}
return Response(response, status.HTTP_400_BAD_REQUEST)
hooks.call_hooks('api_modify_serializer_after_validation', self, serializer)
remote_uuids = serializer.validated_data.get('known_uuids', [])
users = User.objects.filter(uuid__in=remote_uuids).only('id', 'uuid')
users = queryset.filter(uuid__in=remote_uuids).only('id', 'uuid')
unknown_uuids = self.check_unknown_uuids(remote_uuids, users)
data = {
'result': 1,

View File

@ -72,6 +72,34 @@ def test_permission_required(app, user):
app.post_json(URL, status=403)
def test_ou_permission_sufficient(app, user, ou1, users):
user.roles.clear()
r = Role.objects.get_admin_role(
ContentType.objects.get_for_model(User),
name='role',
slug='role',
ou=ou1,
operation=SEARCH_OP,
)
user.roles.add(r)
users = users[:6]
now = datetime.datetime.now()
content = {
'known_uuids': [user.uuid for user in users],
'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
}
resp = app.post_json(URL, params=content, status=200)
assert len(resp.json['unknown_uuids']) == 6
for user in users[:4]:
user.ou = ou1
user.save()
resp = app.post_json(URL, params=content, status=200)
assert len(resp.json['unknown_uuids']) == 6 - 4
@pytest.fixture(scope='session')
def unknown_uuids():
return [uuid.uuid4().hex for i in range(10)]

View File

@ -7,6 +7,7 @@ from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from authentic2.a2_rbac.models import ADD_OP, SEARCH_OP, VIEW_OP, Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.models import APIClient
User = get_user_model()
@ -80,6 +81,43 @@ def test_api_user_synchronization(app, api_client):
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
def test_api_user_synchronization_ou(app, api_client, ou1):
uuids = []
authorized_uuids = []
for index in range(100):
ou = ou1 if index % 2 else get_default_ou()
user = User.objects.create(first_name='ben', last_name='dauve', ou=ou)
uuids.append(user.uuid)
if index % 2:
authorized_uuids.append(user.uuid)
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
url = reverse('a2-api-users-synchronization')
content = {
'known_uuids': uuids + unknown_uuids,
}
random.shuffle(content['known_uuids'])
app.authorization = ('Basic', ('foo', 'bar'))
response = app.post_json(url, params=content, status=401)
app.authorization = ('Basic', (api_client.identifier, api_client.password))
response = app.post_json(url, params=content, status=403)
# give custom_user.search_user permission to user
r = Role.objects.get_admin_role(
ContentType.objects.get_for_model(User),
name='role',
slug='role',
ou=ou1,
operation=SEARCH_OP,
)
api_client.apiclient_roles.add(r)
response = app.post_json(url, params=content)
assert response.json['result'] == 1
assert set(response.json['unknown_uuids']) != set(unknown_uuids)
assert set(unknown_uuids).issubset(set(response.json['unknown_uuids']))
def test_api_users_create(app, api_client):
payload = {
'username': 'janedoe',