ldap: add method and command to deactivate orphaned users (#6379)

This commit is contained in:
Serghei Mihai 2021-02-25 18:37:59 +01:00
parent 1c3bac6c87
commit 762cba9a2d
5 changed files with 85 additions and 2 deletions

View File

@ -5,3 +5,4 @@ MAILTO=root
5 * * * * authentic-multitenant authentic2-multitenant-manage tenant_command cleanupauthentic --all-tenants
10 * * * * authentic-multitenant authentic2-multitenant-manage tenant_command sync-ldap-users --all-tenants
0 5 * * * authentic-multitenant authentic2-multitenant-manage tenant_command clean-unused-accounts --all-tenants
30 5 * * * authentic-multitenant authentic2-multitenant-manage tenant_command deactivate-orphaned-ldap-users --all-tenants

View File

@ -5,3 +5,4 @@ MAILTO=root
5 * * * * authentic2 authentic2-manage cleanupauthentic
10 * * * * authentic2 authentic2-manage sync-ldap-users
0 5 * * * authentic2 authentic2-manage clean-unused-accounts
30 5 * * * authentic2 authentic2-manage deactivate-orphaned-ldap-users

View File

@ -1038,6 +1038,12 @@ class LDAPBackend(object):
attribute = attribute.split(':', 1)[0]
yield attribute
@classmethod
def get_sync_ldap_user_filter(cls, block):
user_filter = force_text(block['sync_ldap_users_filter'] or block['user_filter'])
user_filter = user_filter.replace('%s', '*')
return user_filter
@classmethod
def get_ldap_attributes_names(cls, block):
attributes = set()
@ -1333,8 +1339,7 @@ class LDAPBackend(object):
continue
cls.check_group_to_role_mappings(block)
user_basedn = force_text(block.get('user_basedn') or block['basedn'])
user_filter = force_text(block['sync_ldap_users_filter'] or block['user_filter'])
user_filter = user_filter.replace('%s', '*')
user_filter = cls.get_sync_ldap_user_filter(block)
attribute_names = cls.get_ldap_attributes_names(block)
results = cls.paged_search(conn, user_basedn, ldap.SCOPE_SUBTREE, user_filter, attrlist=attribute_names)
backend = cls()
@ -1342,6 +1347,35 @@ class LDAPBackend(object):
yield backend._return_user(dn, None, conn, block, attrs)
@classmethod
def deactivate_orphaned_users(cls):
for block in cls.get_config():
conn = cls.get_connection(block)
if conn is None:
continue
eids = list(UserExternalId.objects.filter(user__is_active=True,
source=block['realm']).values_list('external_id', flat=True))
basedn = force_text(block.get('user_basedn') or block['basedn'])
attribute_names = [a[0] for a in cls.attribute_name_from_external_id_tuple(block['external_id_tuples'])]
user_filter = cls.get_sync_ldap_user_filter(block)
results = cls.paged_search(conn, basedn, ldap.SCOPE_SUBTREE,
user_filter,
attrlist=attribute_names)
for dn, attrs in results:
data = attrs.copy()
data['dn'] = dn
for eid_tuple in map_text(block['external_id_tuples']):
backend = cls()
external_id = backend.build_external_id(eid_tuple, data)
if external_id:
try:
eids.remove(external_id)
except ValueError:
pass
for eid in UserExternalId.objects.filter(external_id__in=eids):
eid.user.mark_as_inactive()
@classmethod
def ad_encoding(cls, s):
'''Encode a string for AD consumption as a password'''

View File

@ -0,0 +1,25 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.core.management.base import BaseCommand
from authentic2.backends.ldap_backend import LDAPBackend
class Command(BaseCommand):
def handle(self, *args, **kwargs):
LDAPBackend.deactivate_orphaned_users()

View File

@ -218,6 +218,28 @@ def test_simple(slapd, settings, client, db):
assert 'password' not in client.session['ldap-data']
def test_deactivate_orphaned_users(slapd, settings, client, db):
settings.LDAP_AUTH_SETTINGS = [{
'url': [slapd.ldap_url],
'basedn': u'o=ôrga',
'use_tls': False,
}]
# create users as a side effect
list(ldap_backend.LDAPBackend.get_users())
block = settings.LDAP_AUTH_SETTINGS[0]
assert ldap_backend.UserExternalId.objects.filter(
user__is_active=False, source=block['realm']).count() == 0
conn = slapd.get_connection_admin()
conn.delete_s(DN)
ldap_backend.LDAPBackend.deactivate_orphaned_users()
assert ldap_backend.UserExternalId.objects.filter(
user__is_active=False, source=block['realm']).count() == 1
@pytest.mark.django_db
def test_simple_with_binddn(slapd, settings, client):
settings.LDAP_AUTH_SETTINGS = [{