misc: add checks and repairs on admin roles permissions and ou (#48372)
This commit is contained in:
parent
c0c98c2da5
commit
bdb80f27e8
|
@ -21,6 +21,7 @@ import traceback
|
|||
|
||||
from django.conf import settings
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db import connection
|
||||
from django.db.models import Count, Q
|
||||
|
@ -161,7 +162,12 @@ class Command(BaseCommand):
|
|||
@atomic
|
||||
def check_and_repair(self, options):
|
||||
for method in [
|
||||
self.check_roles_with_lost_admin_scope,
|
||||
self.check_duplicate_manage_members_permissions,
|
||||
self.check_duplicate_permissions,
|
||||
self.check_unused_permissions,
|
||||
self.check_instance_permission_ou,
|
||||
self.check_admin_roles_ou,
|
||||
self.check_manager_of_roles,
|
||||
self.check_identifiers_uniqueness,
|
||||
]:
|
||||
|
@ -185,6 +191,13 @@ class Command(BaseCommand):
|
|||
if not answer or answer.lower() == 'n':
|
||||
return False
|
||||
|
||||
def check_roles_with_lost_admin_scope(self):
|
||||
for role in Role.objects.filter(admin_scope_ct__isnull=False):
|
||||
try:
|
||||
role.admin_scope
|
||||
except ObjectDoesNotExist:
|
||||
self.warning('Role %s has lost its admin_scope', role)
|
||||
|
||||
def check_unused_permissions(self):
|
||||
permission_ct = ContentType.objects.get_for_model(Permission)
|
||||
|
||||
|
@ -212,6 +225,115 @@ class Command(BaseCommand):
|
|||
qs.delete()
|
||||
self.success('DONE!')
|
||||
|
||||
def check_duplicate_manage_members_permissions(self):
|
||||
ct_ct = ContentType.objects.get_for_model(ContentType)
|
||||
permission_ct = ContentType.objects.get_for_model(Permission)
|
||||
manage_members_op = get_operation(MANAGE_MEMBERS_OP)
|
||||
permissions = Permission.objects.exclude(target_ct=ct_ct).filter(operation=manage_members_op)
|
||||
targets_with_duplicates = set(
|
||||
permissions
|
||||
.values_list('operation_id', 'target_ct_id', 'target_id')
|
||||
.annotate(count=Count(('operation_id', 'target_ct_id', 'target_id')))
|
||||
.filter(count__gt=1)
|
||||
.values_list('operation_id', 'target_ct_id', 'target_id')
|
||||
)
|
||||
if targets_with_duplicates:
|
||||
self.warning('Found %d manage members permissions with duplicates', len(targets_with_duplicates))
|
||||
if self.repair:
|
||||
for operation_id, target_ct_id, target_id in targets_with_duplicates:
|
||||
qs = Permission.objects.filter(target_ct_id=target_ct_id, target_id=target_id)
|
||||
for perm in qs:
|
||||
linked_admin_role = Role.objects.get(admin_scope_ct=permission_ct, admin_scope_id=perm.id)
|
||||
target = ContentType.objects.get_for_id(target_ct_id).model_class().objects.get(pk=target_id)
|
||||
self.notice(' - %s: [%s]', target, '; '.join(map(str, qs)))
|
||||
if self.do_repair():
|
||||
self.notice('Deleting duplicate manage members permissions...', ending=' ')
|
||||
for operation_id, target_ct_id, target_id in targets_with_duplicates:
|
||||
qs = Permission.objects.filter(target_ct_id=target_ct_id, target_id=target_id).order_by('id')
|
||||
role_perms = []
|
||||
for perm in qs:
|
||||
linked_admin_role = Role.objects.filter(
|
||||
admin_scope_ct=permission_ct, admin_scope_id=perm.id).first()
|
||||
if linked_admin_role:
|
||||
role_perms.append((perm, linked_admin_role))
|
||||
else:
|
||||
perm.delete()
|
||||
if role_perms:
|
||||
first_role = role_perms[0][1]
|
||||
user_ids = set()
|
||||
user_ids.update(first_role.all_members().values_list('id', flat=True))
|
||||
for perm, role in role_perms[1:]:
|
||||
user_ids.update(role.all_members().values_list('id', flat=True))
|
||||
first_role.members.add(*role.members.all())
|
||||
for child_role in role.children(include_self=False):
|
||||
first_role.add_child(child_role)
|
||||
role.delete()
|
||||
perm.delete()
|
||||
assert first_role.all_members().distinct().count() == len(user_ids)
|
||||
self.success('DONE!')
|
||||
|
||||
def check_duplicate_permissions(self):
|
||||
ct_ct = ContentType.objects.get_for_model(ContentType)
|
||||
permissions = Permission.objects.exclude(target_ct=ct_ct)
|
||||
targets_with_duplicates = set(
|
||||
permissions
|
||||
.values_list('operation_id', 'target_ct_id', 'target_id')
|
||||
.annotate(count=Count(('operation_id', 'target_ct_id', 'target_id')))
|
||||
.filter(count__gt=1)
|
||||
.values_list('operation_id', 'target_ct_id', 'target_id')
|
||||
)
|
||||
if targets_with_duplicates:
|
||||
self.warning('Found %d instance permissions with duplicates', len(targets_with_duplicates))
|
||||
if self.repair:
|
||||
for operation_id, target_ct_id, target_id in targets_with_duplicates:
|
||||
qs = Permission.objects.filter(
|
||||
operation_id=operation_id, target_ct_id=target_ct_id, target_id=target_id)
|
||||
target = ContentType.objects.get_for_id(target_ct_id).model_class().objects.get(pk=target_id)
|
||||
self.notice(' - %s: [%s]', target, '; '.join(map(str, qs)))
|
||||
if self.do_repair():
|
||||
self.notice('Deleting duplicate permissions...', ending=' ')
|
||||
for operation_id, target_ct_id, target_id in targets_with_duplicates:
|
||||
qs = list(Permission.objects.filter(
|
||||
operation_id=operation_id, target_ct_id=target_ct_id, target_id=target_id).order_by('id'))
|
||||
first_perm = qs[0]
|
||||
for perm in qs[1:]:
|
||||
perm.delete()
|
||||
if first_perm.ou:
|
||||
first_perm.ou = None
|
||||
first_perm.save()
|
||||
self.success('DONE!')
|
||||
|
||||
def check_instance_permission_ou(self):
|
||||
ct_ct = ContentType.objects.get_for_model(ContentType)
|
||||
permissions = Permission.objects.exclude(target_ct=ct_ct).filter(ou__isnull=False)
|
||||
count = permissions.count()
|
||||
if count:
|
||||
self.warning('Found %d instance permissions with an ou.', count)
|
||||
if self.do_repair():
|
||||
self.notice('Changing ou of instance permissions...', ending=' ')
|
||||
permissions.update(ou=None)
|
||||
self.success('DONE!')
|
||||
|
||||
def check_admin_roles_ou(self):
|
||||
permission_ct = ContentType.objects.get_for_model(Permission)
|
||||
role_ct = ContentType.objects.get_for_model(Role)
|
||||
manage_members_op = get_operation(MANAGE_MEMBERS_OP)
|
||||
roles_to_fix = {}
|
||||
for ou in OU.objects.all():
|
||||
roles = Role.objects.exclude(admin_scope_ct=permission_ct).filter(ou=ou)
|
||||
permissions = Permission.objects.filter(
|
||||
operation=manage_members_op, target_ct=role_ct, target_id__in=roles.values_list('id'))
|
||||
wrong_ou_roles = Role.objects.filter(
|
||||
admin_scope_ct=permission_ct, admin_scope_id__in=permissions.values_list('id')).exclude(ou=ou)
|
||||
if wrong_ou_roles:
|
||||
self.warning('Found %4d admin role with wrong ou in ou %s', wrong_ou_roles.count(), ou)
|
||||
roles_to_fix[ou] = wrong_ou_roles
|
||||
if roles_to_fix and self.do_repair():
|
||||
self.notice('Changing ou of admin roles...', ending=' ')
|
||||
for ou in roles_to_fix:
|
||||
roles_to_fix[ou].update(ou=ou)
|
||||
self.success('DONE!')
|
||||
|
||||
def check_manager_of_roles(self):
|
||||
permission_ct = ContentType.objects.get_for_model(Permission)
|
||||
role_ct = ContentType.objects.get_for_model(Role)
|
||||
|
|
Loading…
Reference in New Issue