ldap_backend: periodically update mapped roles list (#75611) #28
|
@ -20,6 +20,7 @@ cron2 = minute=0,unique=1 /usr/bin/authentic2-multitenant-manage tenant_command
|
|||
cron2 = minute=5,unique=1 /usr/bin/authentic2-multitenant-manage tenant_command cleanupauthentic --all-tenants
|
||||
cron2 = minute=15,unique=1 /usr/bin/authentic2-multitenant-manage tenant_command clean-unused-accounts --all-tenants
|
||||
cron2 = minute=0,hour=0,week=0 /usr/bin/authentic2-multitenant-manage tenant_command clean-user-exports --all-tenants
|
||||
cron2 = minute=0,hour=0,week=0 /usr/bin/authentic2-multitenant-manage tenant_command update-ldap-mapped-roles-list --all-tenants
|
||||
# random sleep: try to avoid multiple machines overloading ldap server
|
||||
cron2 = minute=10,unique=1,harakiri=14400 /bin/bash -c '/bin/sleep $[RANDOM %% 180]' && /usr/bin/authentic2-multitenant-manage tenant_command sync-ldap-users --all-tenants
|
||||
cron2 = minute=30,hour=5,unique=1,harakiri=14400 /bin/bash -c '/bin/sleep $[RANDOM %% 180]' && /usr/bin/authentic2-multitenant-manage tenant_command deactivate-orphaned-ldap-users --all-tenants
|
||||
|
|
|
@ -991,7 +991,8 @@ class LDAPBackend:
|
|||
except Group.DoesNotExist:
|
||||
return None
|
||||
|
||||
def get_role(self, block, role_id):
|
||||
@classmethod
|
||||
def get_role(cls, block, role_id):
|
||||
'''Obtain a Django role'''
|
||||
kwargs = {}
|
||||
slug = None
|
||||
|
@ -1695,6 +1696,32 @@ class LDAPBackend:
|
|||
user_filter = cls.get_sync_ldap_user_filter(block)
|
||||
log.info('Search for %s returned %s users.', user_filter, count)
|
||||
|
||||
@classmethod
|
||||
def update_mapped_roles_list(cls):
|
||||
blocks = cls.get_config()
|
||||
if not blocks:
|
||||
log.info('No LDAP server configured.')
|
||||
return
|
||||
known_mapped_roles = set()
|
||||
for block in blocks:
|
||||
for dummy, role_names in block.get('group_to_role_mapping'):
|
||||
for role_name in role_names:
|
||||
role, error = cls.get_role(block, role_id=role_name)
|
||||
if role is not None:
|
||||
known_mapped_roles.add(role.id)
|
||||
else:
|
||||
log.warning(
|
||||
"error %s: couldn't retrieve role %r during mapping list update", error, role_name
|
||||
)
|
||||
# unmapped roles become assignable again
|
||||
Role.objects.filter(can_manage_members=False).exclude(id__in=known_mapped_roles).update(
|
||||
can_manage_members=True
|
||||
)
|
||||
# on the contrary mapped roles' members list is readonly
|
||||
Role.objects.filter(can_manage_members=True, id__in=known_mapped_roles).update(
|
||||
can_manage_members=False
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def deactivate_orphaned_users(cls):
|
||||
from authentic2.manager.journal_event_types import ManagerUserDeactivation
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2023 Entr'ouver,
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
try:
|
||||
import ldap # pylint: disable=unused-import
|
||||
from ldap.filter import filter_format # pylint: disable=unused-import
|
||||
except ImportError:
|
||||
ldap = None
|
||||
|
||||
from authentic2.backends.ldap_backend import LDAPBackend
|
||||
from authentic2.base_commands import LogToConsoleCommand
|
||||
|
||||
|
||||
class Command(LogToConsoleCommand):
|
||||
loggername = 'authentic2.backends.ldap_backend'
|
||||
|
||||
def core_command(self, *args, **kwargs):
|
||||
LDAPBackend.update_mapped_roles_list()
|
|
@ -1989,6 +1989,47 @@ def test_sync_ldap_users(slapd, settings, app, db, caplog):
|
|||
) % (User.objects.first().uuid, entryuuid)
|
||||
|
||||
|
||||
def test_update_mapped_roles_manageable_members(slapd, settings, app, db, caplog):
|
||||
caplog.set_level('INFO')
|
||||
|
||||
# new roles are mapped, they shouldn't be assignable anymore
|
||||
Role.objects.create(name='LdapRole1', can_manage_members=True)
|
||||
Role.objects.create(name='LdapRole2', can_manage_members=True)
|
||||
# roles are unmapped, they should become assignable again
|
||||
Role.objects.create(name='LdapRole3', can_manage_members=False)
|
||||
Role.objects.create(name='LdapRole4', can_manage_members=False)
|
||||
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
'url': [slapd.ldap_url],
|
||||
'basedn': 'o=ôrga',
|
||||
'use_tls': False,
|
||||
'group_to_role_mapping': [
|
||||
['cn=GrouP1,o=ôrga', ['LdapRole1']],
|
||||
['cn=GrouP2,o=ôrga', ['LdapRole2']],
|
||||
# unknown role, should not be create
|
||||
['cn=GrouP2,o=ôrga', ['LdapRole5']],
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
management.call_command('update-ldap-mapped-roles-list')
|
||||
|
||||
assert set(
|
||||
Role.objects.filter(name__startswith='LdapRole', can_manage_members=False).values_list(
|
||||
'name', flat=True
|
||||
)
|
||||
) == {'LdapRole1', 'LdapRole2'}
|
||||
assert set(
|
||||
Role.objects.filter(name__startswith='LdapRole', can_manage_members=True).values_list(
|
||||
'name', flat=True
|
||||
)
|
||||
) == {'LdapRole3', 'LdapRole4'}
|
||||
assert not Role.objects.filter(name='LdapRole5')
|
||||
assert len(caplog.messages) == 1
|
||||
assert "couldn't retrieve role 'LdapRole5' during mapping list update" in caplog.messages[0]
|
||||
|
||||
|
||||
def test_get_users_select_realm(slapd, settings, db, caplog):
|
||||
settings.LDAP_AUTH_SETTINGS = [
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue