ldap: record user deactivation in journal (#52671)
This commit is contained in:
parent
231f1e7b7c
commit
8268e9e69c
|
@ -1540,10 +1540,13 @@ class LDAPBackend(object):
|
|||
|
||||
@classmethod
|
||||
def deactivate_orphaned_users(cls):
|
||||
from authentic2.manager.journal_event_types import ManagerUserDeactivation
|
||||
|
||||
for block in cls.get_config():
|
||||
conn = cls.get_connection(block)
|
||||
if conn is None:
|
||||
continue
|
||||
ldap_uri = conn.get_option(ldap.OPT_URI)
|
||||
eids = list(
|
||||
UserExternalId.objects.filter(user__is_active=True, source=block['realm']).values_list(
|
||||
'external_id', flat=True
|
||||
|
@ -1573,11 +1576,15 @@ class LDAPBackend(object):
|
|||
):
|
||||
if eid.user.is_active:
|
||||
eid.user.mark_as_inactive(reason=LDAP_DEACTIVATION_REASON_NOT_PRESENT)
|
||||
ManagerUserDeactivation.record(
|
||||
target_user=eid.user, reason=LDAP_DEACTIVATION_REASON_NOT_PRESENT, origin=ldap_uri
|
||||
)
|
||||
# Handle users of old sources
|
||||
uei_qs = UserExternalId.objects.exclude(source__in=[block['realm'] for block in cls.get_config()])
|
||||
for user in User.objects.filter(userexternalid__in=uei_qs):
|
||||
if user.is_active:
|
||||
user.mark_as_inactive(reason=LDAP_DEACTIVATION_REASON_OLD_SOURCE)
|
||||
ManagerUserDeactivation.record(target_user=user, reason=LDAP_DEACTIVATION_REASON_OLD_SOURCE)
|
||||
|
||||
@classmethod
|
||||
def ad_encoding(cls, s):
|
||||
|
|
|
@ -19,6 +19,10 @@ from django.utils.translation import ugettext_lazy as _
|
|||
|
||||
from authentic2.apps.journal.models import EventTypeDefinition
|
||||
from authentic2.apps.journal.utils import form_to_old_new
|
||||
from authentic2.backends.ldap_backend import (
|
||||
LDAP_DEACTIVATION_REASON_NOT_PRESENT,
|
||||
LDAP_DEACTIVATION_REASON_OLD_SOURCE,
|
||||
)
|
||||
from authentic2.journal_event_types import EventTypeWithService, get_attributes_label
|
||||
from django_rbac.utils import get_role_model
|
||||
|
||||
|
@ -201,16 +205,38 @@ class ManagerUserDeactivation(EventTypeDefinition):
|
|||
label = _('user deactivation')
|
||||
|
||||
@classmethod
|
||||
def record(cls, user, session, target_user):
|
||||
super().record(user=user, session=session, references=[target_user])
|
||||
def record(cls, target_user, user=None, session=None, origin=None, reason=None):
|
||||
data = {'reason': reason, 'origin': origin}
|
||||
super().record(user=user, session=session, references=[target_user], data=data)
|
||||
|
||||
@classmethod
|
||||
def get_message(cls, event, context):
|
||||
(user,) = event.get_typed_references(User)
|
||||
reason = event.get_data('reason')
|
||||
if context and context == user:
|
||||
return _('deactivation by administrator')
|
||||
if reason == LDAP_DEACTIVATION_REASON_NOT_PRESENT:
|
||||
return _('automatic deactivation because the associated LDAP account does not exist anymore')
|
||||
elif reason == LDAP_DEACTIVATION_REASON_OLD_SOURCE:
|
||||
return _('automatic deactivation because the associated LDAP source has been deleted')
|
||||
else:
|
||||
return _('deactivation by administrator')
|
||||
elif user:
|
||||
return _('deactivation of user "%s"') % user.get_full_name()
|
||||
if reason == LDAP_DEACTIVATION_REASON_NOT_PRESENT:
|
||||
return (
|
||||
_(
|
||||
'automatic deactivation of user "%s" because the associated LDAP account does not exist anymore'
|
||||
)
|
||||
% user.get_full_name()
|
||||
)
|
||||
elif reason == LDAP_DEACTIVATION_REASON_OLD_SOURCE:
|
||||
return (
|
||||
_(
|
||||
'automatic deactivation of user "%s" because the associated LDAP source has been deleted'
|
||||
)
|
||||
% user.get_full_name()
|
||||
)
|
||||
else:
|
||||
return _('deactivation of user "%s"') % user.get_full_name()
|
||||
return super().get_message(event, context)
|
||||
|
||||
|
||||
|
|
|
@ -255,14 +255,17 @@ def test_deactivate_orphaned_users(slapd, settings, client, db):
|
|||
ldap_backend.LDAPBackend.deactivate_orphaned_users()
|
||||
list(ldap_backend.LDAPBackend.get_users())
|
||||
|
||||
assert (
|
||||
ldap_backend.UserExternalId.objects.filter(
|
||||
user__is_active=False,
|
||||
source=block['realm'],
|
||||
user__deactivation__isnull=False,
|
||||
user__deactivation_reason__startswith='ldap-',
|
||||
).count()
|
||||
== 1
|
||||
deactivated_user = ldap_backend.UserExternalId.objects.get(
|
||||
user__is_active=False,
|
||||
source=block['realm'],
|
||||
user__deactivation__isnull=False,
|
||||
user__deactivation_reason__startswith='ldap-',
|
||||
)
|
||||
utils.assert_event(
|
||||
'manager.user.deactivation',
|
||||
target_user=deactivated_user.user,
|
||||
reason='ldap-not-present',
|
||||
origin=slapd.ldap_url,
|
||||
)
|
||||
|
||||
# deactivate an active user manually
|
||||
|
@ -273,16 +276,28 @@ def test_deactivate_orphaned_users(slapd, settings, client, db):
|
|||
ldap_backend.LDAPBackend.deactivate_orphaned_users()
|
||||
list(ldap_backend.LDAPBackend.get_users())
|
||||
|
||||
ldap_deactivated_users = ldap_backend.UserExternalId.objects.filter(
|
||||
user__is_active=False,
|
||||
source=block['realm'],
|
||||
user__deactivation__isnull=False,
|
||||
user__deactivation_reason__startswith='ldap-',
|
||||
)
|
||||
assert ldap_deactivated_users.count() == 5
|
||||
assert (
|
||||
ldap_backend.UserExternalId.objects.filter(
|
||||
user__is_active=False,
|
||||
source=block['realm'],
|
||||
user__deactivation__isnull=False,
|
||||
user__deactivation_reason__startswith='ldap-',
|
||||
).count()
|
||||
== 5
|
||||
== 6
|
||||
)
|
||||
assert User.objects.filter(is_active=False).count() == 6
|
||||
|
||||
for ldap_user in ldap_deactivated_users.exclude(pk=deactivated_user.pk):
|
||||
utils.assert_event(
|
||||
'manager.user.deactivation',
|
||||
target_user=ldap_user.user,
|
||||
reason='ldap-old-source',
|
||||
)
|
||||
|
||||
# reactivate users
|
||||
settings.LDAP_AUTH_SETTINGS = [block]
|
||||
|
|
|
@ -251,6 +251,16 @@ def events(db, freezer):
|
|||
old_email='old@example.com',
|
||||
new_email='new@example.com',
|
||||
)
|
||||
make(
|
||||
'manager.user.deactivation',
|
||||
target_user=user,
|
||||
reason='ldap-not-present',
|
||||
)
|
||||
make(
|
||||
'manager.user.deactivation',
|
||||
target_user=user,
|
||||
reason='ldap-old-source',
|
||||
)
|
||||
|
||||
# verify we created at least one event for each type
|
||||
assert set(Event.objects.values_list("type__name", flat=True)) == set(_registry)
|
||||
|
@ -542,6 +552,18 @@ def test_global_journal(app, superuser, events):
|
|||
'type': 'user.email.change',
|
||||
'user': 'Johnny doe',
|
||||
},
|
||||
{
|
||||
'timestamp': 'Jan. 2, 2020, 5 p.m.',
|
||||
'type': 'manager.user.deactivation',
|
||||
'user': '-',
|
||||
'message': 'automatic deactivation of user "Johnny doe" because the associated LDAP account does not exist anymore',
|
||||
},
|
||||
{
|
||||
'timestamp': 'Jan. 2, 2020, 6 p.m.',
|
||||
'type': 'manager.user.deactivation',
|
||||
'user': '-',
|
||||
'message': 'automatic deactivation of user "Johnny doe" because the associated LDAP source has been deleted',
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
|
@ -727,6 +749,18 @@ def test_user_journal(app, superuser, events):
|
|||
'type': 'user.email.change',
|
||||
'user': 'Johnny doe',
|
||||
},
|
||||
{
|
||||
'timestamp': 'Jan. 2, 2020, 5 p.m.',
|
||||
'type': 'manager.user.deactivation',
|
||||
'user': '-',
|
||||
'message': 'automatic deactivation because the associated LDAP account does not exist anymore',
|
||||
},
|
||||
{
|
||||
'timestamp': 'Jan. 2, 2020, 6 p.m.',
|
||||
'type': 'manager.user.deactivation',
|
||||
'user': '-',
|
||||
'message': 'automatic deactivation because the associated LDAP source has been deleted',
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
|
@ -979,6 +1013,8 @@ def test_search(app, superuser, events):
|
|||
|
||||
table_content = [text_content(p) for p in response.pyquery('tbody td.journal-list--message-column')]
|
||||
assert table_content == [
|
||||
'automatic deactivation of user "Johnny doe" because the associated LDAP source has been deleted',
|
||||
'automatic deactivation of user "Johnny doe" because the associated LDAP account does not exist anymore',
|
||||
'deactivation of user "Johnny doe"',
|
||||
'activation of user "Johnny doe"',
|
||||
'mandatory password change at next login unset for user "Johnny doe"',
|
||||
|
|
|
@ -264,7 +264,7 @@ def text_content(node):
|
|||
return ''.join(node.itertext()) if node is not None else ''
|
||||
|
||||
|
||||
def assert_event(event_type_name, user=None, session=None, service=None, **data):
|
||||
def assert_event(event_type_name, user=None, session=None, service=None, target_user=None, **data):
|
||||
qs = Event.objects.filter(type__name=event_type_name)
|
||||
if user:
|
||||
qs = qs.filter(user=user)
|
||||
|
@ -278,6 +278,8 @@ def assert_event(event_type_name, user=None, session=None, service=None, **data)
|
|||
qs = qs.which_references(service)
|
||||
else:
|
||||
qs = qs.exclude(qs._which_references_query(models.Service))
|
||||
if target_user:
|
||||
qs = qs.which_references(target_user)
|
||||
|
||||
assert qs.count() == 1
|
||||
|
||||
|
|
Loading…
Reference in New Issue