authentic2: use direct imports for rbac models (#70963)
This commit is contained in:
parent
ccbac9a310
commit
f3f20e8f6c
|
@ -5,6 +5,7 @@ from time import sleep
|
|||
|
||||
import requests
|
||||
from authentic2 import app_settings
|
||||
from authentic2.a2_rbac.models import OrganizationalUnit, Role
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.compat_lasso import lasso
|
||||
from authentic2.models import Attribute
|
||||
|
@ -15,7 +16,6 @@ from django.contrib.contenttypes.models import ContentType
|
|||
from django.core import serializers
|
||||
from django.utils.translation import activate
|
||||
from django.utils.translation import ugettext as _
|
||||
from django_rbac.utils import get_ou_model, get_role_model
|
||||
from tenant_schemas.utils import tenant_context
|
||||
|
||||
from hobo.agent.authentic2.provisionning import Provisionning
|
||||
|
@ -185,7 +185,7 @@ class Command(hobo_deploy.Command):
|
|||
provider.metadata_url = service['saml-sp-metadata-url']
|
||||
variables = service.get('variables', {})
|
||||
if variables.get('ou-slug'):
|
||||
ou, created = get_ou_model().objects.get_or_create(
|
||||
ou, created = OrganizationalUnit.objects.get_or_create(
|
||||
slug=service['variables']['ou-slug']
|
||||
)
|
||||
ou.name = service['variables']['ou-label']
|
||||
|
@ -208,7 +208,7 @@ class Command(hobo_deploy.Command):
|
|||
create_ou = True
|
||||
break
|
||||
if create_ou:
|
||||
ou, created = get_ou_model().objects.get_or_create(name=service['title'])
|
||||
ou, created = OrganizationalUnit.objects.get_or_create(name=service['title'])
|
||||
if service_created or not provider.ou:
|
||||
provider.ou = ou
|
||||
provision_target_ous[provider.ou.id] = provider.ou
|
||||
|
@ -226,7 +226,6 @@ class Command(hobo_deploy.Command):
|
|||
service_provider.save()
|
||||
|
||||
# add a superuser role for the service
|
||||
Role = get_role_model()
|
||||
name = _('Superuser of %s') % service['title']
|
||||
su_role, created = Role.objects.get_or_create(
|
||||
service=provider, slug='_a2-hobo-superuser', defaults={'name': name}
|
||||
|
@ -272,7 +271,7 @@ class Command(hobo_deploy.Command):
|
|||
if provision_target_ous:
|
||||
# mass provision roles on new created services
|
||||
engine = Provisionning()
|
||||
roles = get_role_model().objects.all()
|
||||
roles = Role.objects.all()
|
||||
engine.notify_roles(provision_target_ous, roles, full=True)
|
||||
|
||||
for service in services:
|
||||
|
@ -298,7 +297,6 @@ class Command(hobo_deploy.Command):
|
|||
if not os.path.exists(roles_filename):
|
||||
self.logger.debug('no skeleton roles: roles file %r does not ' 'exist', roles_filename)
|
||||
return
|
||||
Role = get_role_model()
|
||||
if Role.objects.filter(ou=provider.ou).exclude(slug__startswith='_').exists():
|
||||
return
|
||||
roles = []
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import time
|
||||
|
||||
from authentic2.a2_rbac.models import OrganizationalUnit, Role
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.core.management.base import BaseCommand
|
||||
from django_rbac.utils import get_ou_model, get_role_model
|
||||
|
||||
from hobo.agent.authentic2.provisionning import Provisionning
|
||||
|
||||
|
@ -26,7 +26,7 @@ class Command(BaseCommand):
|
|||
def handle(self, *args, **options):
|
||||
self.verbosity = options['verbosity']
|
||||
engine = Provisionning()
|
||||
ous = {ou.id: ou for ou in get_ou_model().objects.all()}
|
||||
ous = {ou.id: ou for ou in OrganizationalUnit.objects.all()}
|
||||
|
||||
if options['roles']:
|
||||
self.provision_roles(engine, ous)
|
||||
|
@ -43,7 +43,7 @@ class Command(BaseCommand):
|
|||
self.stdout.write('Done.')
|
||||
|
||||
def provision_roles(self, engine, ous):
|
||||
roles = get_role_model().objects.all()
|
||||
roles = Role.objects.all()
|
||||
if self.verbosity > 0:
|
||||
self.stdout.write(f'Provisionning {roles.count()} roles.')
|
||||
engine.notify_roles(ous, roles, full=True)
|
||||
|
@ -69,11 +69,9 @@ class Command(BaseCommand):
|
|||
time.sleep(batch_sleep)
|
||||
|
||||
if has_role_attributes:
|
||||
roles_with_attributes = (
|
||||
get_role_model().objects.filter(attributes__name='is_superuser').children()
|
||||
)
|
||||
roles_with_attributes = Role.objects.filter(attributes__name='is_superuser').children()
|
||||
else:
|
||||
roles_with_attributes = get_role_model().objects.filter(is_superuser=True).children()
|
||||
roles_with_attributes = Role.objects.filter(is_superuser=True).children()
|
||||
# first those without and admin attribute
|
||||
normal_users = qs.exclude(roles__in=roles_with_attributes)
|
||||
|
||||
|
|
|
@ -7,6 +7,8 @@ import urllib.parse
|
|||
from itertools import chain, islice
|
||||
|
||||
import requests
|
||||
from authentic2.a2_rbac.models import OrganizationalUnit as OU
|
||||
from authentic2.a2_rbac.models import Role, RoleParenting
|
||||
from authentic2.models import AttributeValue
|
||||
from authentic2.saml.models import LibertyProvider
|
||||
from django.conf import settings
|
||||
|
@ -14,7 +16,6 @@ from django.contrib.auth import get_user_model
|
|||
from django.db import connection, transaction
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_text
|
||||
from django_rbac.utils import get_ou_model, get_role_model, get_role_parenting_model
|
||||
|
||||
from hobo.agent.common import notify_agents
|
||||
from hobo.signature import sign_url
|
||||
|
@ -28,9 +29,6 @@ except ImportError:
|
|||
|
||||
|
||||
User = get_user_model()
|
||||
Role = get_role_model()
|
||||
OU = get_ou_model()
|
||||
RoleParenting = get_role_parenting_model()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
|
@ -5,13 +5,12 @@ from unittest.mock import ANY, call, patch
|
|||
import lasso
|
||||
import pytest
|
||||
import requests
|
||||
from authentic2.a2_rbac.models import Role, RoleAttribute
|
||||
from authentic2.a2_rbac.models import OrganizationalUnit, Role, RoleAttribute
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.models import Attribute, AttributeValue
|
||||
from authentic2.saml.models import LibertyProvider
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.core.management import call_command
|
||||
from django_rbac.utils import get_ou_model
|
||||
from tenant_schemas.utils import tenant_context
|
||||
|
||||
from hobo import signature
|
||||
|
@ -249,7 +248,7 @@ def test_provision_user(transactional_db, tenant, caplog):
|
|||
|
||||
# test a service in a second OU also get the provisionning message
|
||||
notify_agents.reset_mock()
|
||||
ou2 = get_ou_model().objects.create(name='ou2', slug='ou2')
|
||||
ou2 = OrganizationalUnit.objects.create(name='ou2', slug='ou2')
|
||||
LibertyProvider.objects.create(
|
||||
ou=ou2,
|
||||
name='provider2',
|
||||
|
@ -482,7 +481,7 @@ def test_provision_user(transactional_db, tenant, caplog):
|
|||
assert o['is_superuser'] is False
|
||||
|
||||
notify_agents.reset_mock()
|
||||
ou2 = get_ou_model().objects.create(name='ou2', slug='ou2')
|
||||
ou2 = OrganizationalUnit.objects.create(name='ou2', slug='ou2')
|
||||
LibertyProvider.objects.create(
|
||||
ou=get_default_ou(),
|
||||
name='provider2',
|
||||
|
|
Loading…
Reference in New Issue