misc: remove compatibility code with old authentic version (#72027)

This commit is contained in:
Valentin Deniaud 2022-12-05 12:22:01 +01:00
parent 6fb905f17c
commit c2aed1b25d
4 changed files with 15 additions and 80 deletions

View File

@ -14,16 +14,11 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from authentic2.a2_rbac.signals import post_soft_create, post_soft_delete
from django.apps import AppConfig
from django.conf import settings
from django.db.models.signals import m2m_changed, post_save, pre_delete, pre_save
try:
from authentic2.a2_rbac.signals import post_soft_create, post_soft_delete
except ImportError:
# legacy
from django_rbac.signals import post_soft_create, post_soft_delete
class Plugin:
def get_before_urls(self):

View File

@ -21,14 +21,6 @@ from tenant_schemas.utils import tenant_context
from hobo.agent.authentic2.provisionning import Provisionning
from hobo.agent.common.management.commands import hobo_deploy
try:
from authentic2.a2_rbac.models import RoleAttribute
has_role_attributes = True
except ImportError:
has_role_attributes = False
User = get_user_model()
@ -233,11 +225,10 @@ class Command(hobo_deploy.Command):
if su_role.name != name:
su_role.name = name
su_role.save()
if has_role_attributes:
su_role.attributes.get_or_create(name='is_superuser', kind='string', value='true')
else:
su_role.is_superuser = True
su_role.save()
su_role.is_superuser = True
su_role.save()
# pass the new attribute to the service
SAMLAttribute.objects.get_or_create(
name='is_superuser',

View File

@ -6,13 +6,6 @@ from django.core.management.base import BaseCommand
from hobo.agent.authentic2.provisionning import Provisionning
try:
from authentic2.a2_rbac.models import RoleAttribute
has_role_attributes = True
except ImportError:
has_role_attributes = False
class Command(BaseCommand):
help = 'Provision all roles or users'
@ -68,10 +61,7 @@ class Command(BaseCommand):
if users:
time.sleep(batch_sleep)
if has_role_attributes:
roles_with_attributes = Role.objects.filter(attributes__name='is_superuser').children()
else:
roles_with_attributes = Role.objects.filter(is_superuser=True).children()
roles_with_attributes = Role.objects.filter(is_superuser=True).children()
# first those without and admin attribute
normal_users = qs.exclude(roles__in=roles_with_attributes)

View File

@ -20,14 +20,6 @@ from django.utils.encoding import force_text
from hobo.agent.common import notify_agents
from hobo.signature import sign_url
try:
from authentic2.a2_rbac.models import RoleAttribute
except ImportError:
class RoleAttribute:
dummy = True
User = get_user_model()
logger = logging.getLogger(__name__)
@ -190,36 +182,21 @@ class Provisionning(threading.local):
for role in user_roles.get(user.id, []):
if role.service_id != service.pk:
continue
if hasattr(RoleAttribute, 'dummy'):
role_is_superuser = role.is_superuser
else:
for attribute in role.attributes.all():
if attribute.name == 'is_superuser' and attribute.value == 'true':
role_is_superuser = True
role_is_superuser = role.is_superuser
data['is_superuser'] = user.is_superuser or role_is_superuser
return data
# Find roles giving a superuser attribute
# If there is any role of this kind, we do one provisionning message for each user and
# each service.
if hasattr(RoleAttribute, 'dummy'):
roles_with_attributes = (
Role.objects.filter(members__in=users)
.parents(include_self=True)
.filter(is_superuser=True)
.exists()
)
else:
roles_with_attributes = (
Role.objects.filter(members__in=users)
.parents(include_self=True)
.filter(attributes__name='is_superuser')
.exists()
)
roles_with_attributes = (
Role.objects.filter(members__in=users)
.parents(include_self=True)
.filter(is_superuser=True)
.exists()
)
all_roles = Role.objects.all()
if not hasattr(RoleAttribute, 'dummy'):
all_roles = all_roles.prefetch_related('attributes')
roles = {r.id: r for r in all_roles}
user_roles = {}
parents = {}
@ -317,8 +294,6 @@ class Provisionning(threading.local):
)
roles = {role for role in roles if not is_forbidden_technical_role(role)}
if mode == 'provision' and not hasattr(RoleAttribute, 'dummy'):
self.complete_roles(roles)
if not roles:
return
@ -422,15 +397,6 @@ class Provisionning(threading.local):
qs = LibertyProvider.objects.filter(ou__isnull=True)
return [(service, service.entity_id) for service in qs]
def complete_roles(self, roles):
for role in roles:
role.emails = []
role.emails_to_members = True
role.details = ''
for attribute in role.attributes.all():
if attribute.name in ('emails', 'emails_to_members', 'details') and attribute.kind == 'json':
setattr(role, attribute.name, json.loads(attribute.value))
def get_entity_id(self):
tenant = getattr(connection, 'tenant', None)
assert tenant
@ -443,13 +409,11 @@ class Provisionning(threading.local):
# we skip new instances
if not instance.pk:
return
if not isinstance(instance, (User, Role, RoleAttribute, AttributeValue)):
if not isinstance(instance, (User, Role, AttributeValue)):
return
# ignore last_login update on login
if isinstance(instance, User) and (update_fields and set(update_fields) == {'last_login'}):
return
if isinstance(instance, RoleAttribute):
instance = instance.role
elif isinstance(instance, AttributeValue):
if not isinstance(instance.owner, User):
return
@ -465,10 +429,8 @@ class Provisionning(threading.local):
return
if not created:
return
if not isinstance(instance, (User, Role, RoleAttribute, AttributeValue)):
if not isinstance(instance, (User, Role, AttributeValue)):
return
if isinstance(instance, RoleAttribute):
instance = instance.role
elif isinstance(instance, AttributeValue):
if not isinstance(instance.owner, User):
return
@ -480,9 +442,6 @@ class Provisionning(threading.local):
return
if isinstance(instance, (User, Role)):
self.add_deleted(copy.copy(instance))
elif isinstance(instance, RoleAttribute):
instance = instance.role
self.add_saved(instance)
elif isinstance(instance, AttributeValue):
if not isinstance(instance.owner, User):
return