LDAPBackend: reactive user on login/synchronization if inactive (#52670)

This commit is contained in:
Benjamin Renard 2021-04-09 16:44:49 +02:00 committed by Benjamin Dauvergne
parent a6350703d1
commit 231f1e7b7c
6 changed files with 113 additions and 148 deletions

View File

@ -37,7 +37,6 @@ import logging
import os
import random
import time
import urllib.parse
from django.conf import settings
from django.contrib import messages
@ -338,6 +337,10 @@ def password_policy_control_messages(ctrl, attributes):
return messages
LDAP_DEACTIVATION_REASON_NOT_PRESENT = 'ldap-not-present'
LDAP_DEACTIVATION_REASON_OLD_SOURCE = 'ldap-old-source'
class LDAPUser(User):
SESSION_LDAP_DATA_KEY = 'ldap-data'
_changed = False
@ -1480,6 +1483,9 @@ class LDAPBackend(object):
if not is_user_authenticable(user):
return None
if not user.is_active and user.deactivation_reason.startswith('ldap-'):
user.mark_as_active()
user_login_success(user.get_username())
return user
@ -1565,11 +1571,13 @@ class LDAPBackend(object):
for eid in UserExternalId.objects.filter(
external_id__in=eids, user__is_active=True, source=block['realm']
):
eid.user.mark_as_inactive()
if eid.user.is_active:
eid.user.mark_as_inactive(reason=LDAP_DEACTIVATION_REASON_NOT_PRESENT)
# Handle users of old sources
uei_qs = UserExternalId.objects.exclude(source__in=[block['realm'] for block in cls.get_config()])
for user in User.objects.filter(userexternalid__in=uei_qs):
user.mark_as_inactive()
if user.is_active:
user.mark_as_inactive(reason=LDAP_DEACTIVATION_REASON_OLD_SOURCE)
@classmethod
def ad_encoding(cls, s):

View File

@ -0,0 +1,18 @@
# Generated by Django 2.2.23 on 2021-05-18 16:14
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('custom_user', '0026_remove_user_deleted'),
]
operations = [
migrations.AddField(
model_name='user',
name='deactivation_reason',
field=models.TextField(blank=True, null=True, verbose_name='Deactivation reason'),
),
]

View File

@ -177,6 +177,7 @@ class User(AbstractBaseUser, PermissionMixin):
verbose_name=_('Last account deletion alert'), null=True, blank=True
)
deactivation = models.DateTimeField(verbose_name=_('Deactivation datetime'), null=True, blank=True)
deactivation_reason = models.TextField(verbose_name=_('Deactivation reason'), null=True, blank=True)
objects = UserManager.from_queryset(UserQuerySet)()
attributes = AttributesDescriptor()
@ -360,10 +361,17 @@ class User(AbstractBaseUser, PermissionMixin):
del self._a2_attributes_cache
return super(User, self).refresh_from_db(*args, **kwargs)
def mark_as_inactive(self, timestamp=None):
def mark_as_active(self):
self.is_active = True
self.deactivation = None
self.deactivation_reason = None
self.save(update_fields=['is_active', 'deactivation', 'deactivation_reason'])
def mark_as_inactive(self, timestamp=None, reason=None):
self.is_active = False
self.deactivation = timestamp or timezone.now()
self.save(update_fields=['is_active', 'deactivation'])
self.deactivation_reason = reason
self.save(update_fields=['is_active', 'deactivation', 'deactivation_reason'])
def set_random_password(self):
self.set_password(base64.b64encode(os.urandom(32)).decode('ascii'))

View File

@ -87,6 +87,7 @@ class SerializerTests(TestCase):
'password': '',
'ou': None,
'deactivation': None,
'deactivation_reason': None,
},
},
{

View File

@ -48,6 +48,31 @@ pytestmark = pytest.mark.django_db
User = get_user_model()
USER_ATTRIBUTES_SET = set(
[
'ou',
'id',
'uuid',
'is_staff',
'is_superuser',
'first_name',
'first_name_verified',
'last_name',
'last_name_verified',
'date_joined',
'last_login',
'username',
'password',
'email',
'is_active',
'modified',
'email_verified',
'last_account_deletion_alert',
'deactivation',
'deactivation_reason',
]
)
def test_api_user_simple(logged_app):
resp = logged_app.get('/api/user/')
@ -497,34 +522,7 @@ def test_api_users_create(settings, app, api_user):
resp = app.post_json('/api/users/', params=payload, status=status)
if api_user.is_superuser or api_user.roles.exists():
assert (
set(
[
'ou',
'id',
'uuid',
'is_staff',
'is_superuser',
'first_name',
'first_name_verified',
'last_name',
'last_name_verified',
'date_joined',
'last_login',
'username',
'password',
'email',
'is_active',
'title',
'title_verified',
'modified',
'email_verified',
'last_account_deletion_alert',
'deactivation',
]
)
== set(resp.json.keys())
)
assert (USER_ATTRIBUTES_SET | set(['title', 'title_verified'])) == set(resp.json)
assert resp.json['first_name'] == payload['first_name']
assert resp.json['last_name'] == payload['last_name']
assert resp.json['email'] == payload['email']
@ -584,34 +582,7 @@ def test_api_users_create(settings, app, api_user):
resp = app.post_json('/api/users/', params=payload, status=status)
if api_user.is_superuser or api_user.roles.exists():
assert (
set(
[
'ou',
'id',
'uuid',
'is_staff',
'is_superuser',
'first_name',
'first_name_verified',
'last_name',
'last_name_verified',
'date_joined',
'last_login',
'username',
'password',
'email',
'is_active',
'title',
'title_verified',
'modified',
'email_verified',
'last_account_deletion_alert',
'deactivation',
]
)
== set(resp.json.keys())
)
assert (USER_ATTRIBUTES_SET | set(['title', 'title_verified'])) == set(resp.json)
user = get_user_model().objects.get(pk=resp.json['id'])
assert AttributeValue.objects.with_owner(user).filter(verified=True).count() == 3
assert AttributeValue.objects.with_owner(user).filter(verified=False).count() == 0
@ -761,32 +732,7 @@ def test_api_role_get_member(app, api_user, role, member):
member.roles.add(role)
resp = app.get('/api/roles/{0}/members/{1}/'.format(role.uuid, member.uuid))
assert resp.json['uuid'] == member.uuid
assert (
set(
[
'id',
'ou',
'date_joined',
'last_login',
'password',
'is_superuser',
'uuid',
'username',
'first_name',
'last_name',
'email',
'email_verified',
'is_staff',
'is_active',
'modified',
'last_account_deletion_alert',
'deactivation',
'first_name_verified',
'last_name_verified',
]
)
== set(resp.json.keys())
)
assert USER_ATTRIBUTES_SET == set(resp.json)
else:
assert resp.json['result'] == 0
assert resp.json['errors'] == 'User not allowed to view role'
@ -819,34 +765,7 @@ def test_api_role_get_member_nested(app, admin_ou1, user_ou1, role_ou1, role_ran
# api call with nested users
resp = app.get(url, params={'nested': 'true'})
assert resp.json['username'] == 'admin.ou1'
assert (
set(
[
'ou',
'id',
'uuid',
'is_staff',
'is_superuser',
'first_name',
'first_name_verified',
'last_name',
'last_name_verified',
'date_joined',
'last_login',
'username',
'password',
'email',
'is_active',
'modified',
'email_verified',
'last_account_deletion_alert',
'deactivation',
'birthdate',
'birthdate_verified',
]
)
== set(resp.json)
)
assert USER_ATTRIBUTES_SET | set(['birthdate', 'birthdate_verified']) == set(resp.json)
def test_api_role_add_member(app, api_user, role, member):
@ -1581,34 +1500,7 @@ def test_api_get_role_member_list(app, admin_ou1, user_ou1, role_ou1, role_rando
resp = app.get(url)
assert len(resp.json['results']) > 0
for user_dict in resp.json['results']:
assert (
set(
[
'ou',
'id',
'uuid',
'is_staff',
'is_superuser',
'first_name',
'first_name_verified',
'last_name',
'last_name_verified',
'date_joined',
'last_login',
'username',
'password',
'email',
'is_active',
'modified',
'email_verified',
'last_account_deletion_alert',
'deactivation',
'birthdate',
'birthdate_verified',
]
)
== set(user_dict.keys())
)
assert USER_ATTRIBUTES_SET | set(['birthdate', 'birthdate_verified']) == set(user_dict)
assert [x['username'] for x in resp.json['results']] == ['john.doe']
# api call with nested users

View File

@ -253,20 +253,58 @@ def test_deactivate_orphaned_users(slapd, settings, client, db):
conn.delete_s(DN)
ldap_backend.LDAPBackend.deactivate_orphaned_users()
list(ldap_backend.LDAPBackend.get_users())
assert (
ldap_backend.UserExternalId.objects.filter(user__is_active=False, source=block['realm']).count() == 1
ldap_backend.UserExternalId.objects.filter(
user__is_active=False,
source=block['realm'],
user__deactivation__isnull=False,
user__deactivation_reason__startswith='ldap-',
).count()
== 1
)
# deactivate an active user manually
User.objects.filter(is_active=True).first().mark_as_inactive(reason='bad user')
# rename source realm
settings.LDAP_AUTH_SETTINGS = [
{'url': [slapd.ldap_url], 'basedn': 'o=ôrga', 'use_tls': False, 'realm': 'test'}
]
settings.LDAP_AUTH_SETTINGS = []
ldap_backend.LDAPBackend.deactivate_orphaned_users()
list(ldap_backend.LDAPBackend.get_users())
assert (
ldap_backend.UserExternalId.objects.filter(
user__is_active=False,
source=block['realm'],
user__deactivation__isnull=False,
user__deactivation_reason__startswith='ldap-',
).count()
== 5
)
assert User.objects.filter(is_active=False).count() == 6
# reactivate users
settings.LDAP_AUTH_SETTINGS = [block]
list(ldap_backend.LDAPBackend.get_users())
ldap_backend.LDAPBackend.deactivate_orphaned_users()
assert (
ldap_backend.UserExternalId.objects.filter(user__is_active=False, source=block['realm']).count() == 6
ldap_backend.UserExternalId.objects.filter(
user__is_active=False,
source=block['realm'],
user__deactivation__isnull=False,
user__deactivation_reason__startswith='ldap-',
).count()
== 1
)
assert (
User.objects.filter(
is_active=True, deactivation_reason__isnull=True, deactivation__isnull=True
).count()
== 4
)
assert User.objects.filter(is_active=False).count() == 2
assert User.objects.count() == 6
@pytest.mark.django_db