misc: delete users synchronously (#51452)
This commit is contained in:
parent
2bdd088d43
commit
8e0f6d3cb7
|
@ -287,8 +287,7 @@ class AuthenticUserAdmin(UserAdmin):
|
|||
readonly_fields = ('uuid',)
|
||||
list_filter = UserAdmin.list_filter + (UserRealmListFilter, ExternalUserListFilter) + ('deleted',)
|
||||
list_display = ['__str__', 'ou', 'first_name', 'last_name', 'email', 'deleted']
|
||||
actions = UserAdmin.actions + ['mark_as_inactive', 'mark_as_deleted',
|
||||
'unmark_as_deleted', 'apply_deletion']
|
||||
actions = UserAdmin.actions + ['mark_as_inactive']
|
||||
|
||||
def get_fieldsets(self, request, obj=None):
|
||||
fieldsets = deepcopy(super(AuthenticUserAdmin, self).get_fieldsets(request, obj))
|
||||
|
@ -335,26 +334,6 @@ class AuthenticUserAdmin(UserAdmin):
|
|||
user.mark_as_inactive(timestamp=timestamp)
|
||||
mark_as_inactive.short_description = _('Mark as inactive')
|
||||
|
||||
@transaction.atomic
|
||||
def mark_as_deleted(self, request, queryset):
|
||||
timestamp = timezone.now()
|
||||
for user in queryset:
|
||||
user.mark_as_deleted(timestamp=timestamp)
|
||||
mark_as_deleted.short_description = _('Mark as deleted')
|
||||
|
||||
@transaction.atomic
|
||||
def unmark_as_deleted(self, request, queryset):
|
||||
for user in queryset:
|
||||
if '#' in user.email:
|
||||
user.email = user.email.rsplit('#', 1)[0]
|
||||
user.save()
|
||||
queryset.update(deleted=None)
|
||||
unmark_as_deleted.short_description = _('Un-mark as deleted')
|
||||
|
||||
def apply_deletion(self, request, queryset):
|
||||
queryset.cleanup(threshold=0)
|
||||
apply_deletion.short_description = _('Apply deletion')
|
||||
|
||||
admin.site.register(User, AuthenticUserAdmin)
|
||||
|
||||
|
||||
|
|
|
@ -14,15 +14,13 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import unicodedata
|
||||
import uuid
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.postgres.search import TrigramDistance
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db import models, transaction, connection
|
||||
from django.db import models, connection
|
||||
from django.db.models import F, Value, FloatField, Subquery, OuterRef, Q
|
||||
from django.db.models.functions import Lower, Coalesce
|
||||
from django.utils import timezone
|
||||
|
@ -30,7 +28,7 @@ from django.contrib.auth.models import BaseUserManager
|
|||
from django.contrib.postgres.search import SearchQuery
|
||||
|
||||
from authentic2 import app_settings
|
||||
from authentic2.models import AttributeValue, UserExternalId
|
||||
from authentic2.models import AttributeValue
|
||||
from authentic2.utils.lookups import Unaccent, ImmutableConcat
|
||||
from authentic2.utils.date import parse_date
|
||||
from authentic2.attribute_kinds import clean_number
|
||||
|
@ -145,42 +143,6 @@ class UserQuerySet(models.QuerySet):
|
|||
|
||||
return qs
|
||||
|
||||
@transaction.atomic
|
||||
def cleanup(self, threshold=600, timestamp=None):
|
||||
'''Delete all deleted users for more than 10 minutes.'''
|
||||
from .models import DeletedUser
|
||||
|
||||
not_after = (timestamp or timezone.now()) - datetime.timedelta(seconds=threshold)
|
||||
qs = self.filter(deleted__lt=not_after)
|
||||
|
||||
loaded = list(qs)
|
||||
|
||||
def log():
|
||||
logger = logging.getLogger('authentic2')
|
||||
for user in loaded:
|
||||
logger.info(u'deleted account %s', user)
|
||||
transaction.on_commit(log)
|
||||
for user in qs:
|
||||
deleted_user = DeletedUser(deleted=user.deleted, old_user_id=user.id)
|
||||
if 'email' in app_settings.A2_USER_DELETED_KEEP_DATA:
|
||||
deleted_user.old_email = user.email.rsplit('#', 1)[0]
|
||||
if 'uuid' in app_settings.A2_USER_DELETED_KEEP_DATA:
|
||||
deleted_user.old_uuid = user.uuid
|
||||
# save LDAP account references
|
||||
external_ids = UserExternalId.objects.filter(user=user).order_by('id')
|
||||
if external_ids.exists():
|
||||
deleted_user.old_data = {'external_ids': []}
|
||||
for external_id in external_ids:
|
||||
deleted_user.old_data['external_ids'].append(
|
||||
{
|
||||
'source': external_id.source,
|
||||
'external_id': external_id.external_id,
|
||||
}
|
||||
)
|
||||
external_ids.delete()
|
||||
deleted_user.save()
|
||||
qs.delete()
|
||||
|
||||
|
||||
class UserManager(BaseUserManager):
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ class Migration(migrations.Migration):
|
|||
name='DeletedUser',
|
||||
fields=[
|
||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('deleted', models.DateTimeField(verbose_name='Deletion date')),
|
||||
('deleted', models.DateTimeField(verbose_name='Deletion date', auto_now_add=True)),
|
||||
('old_uuid', models.TextField(blank=True, null=True, verbose_name='Old UUID')),
|
||||
('old_user_id', models.PositiveIntegerField(blank=True, null=True, verbose_name='Old user id')),
|
||||
('old_email', models.EmailField(blank=True, max_length=254, null=True, verbose_name='Old email adress')),
|
||||
|
|
|
@ -40,7 +40,7 @@ from django_rbac.utils import get_role_parenting_model
|
|||
|
||||
from authentic2 import utils, app_settings
|
||||
from authentic2.decorators import errorcollector, RequestCache
|
||||
from authentic2.models import Service, AttributeValue, Attribute
|
||||
from authentic2.models import Service, AttributeValue, Attribute, UserExternalId
|
||||
from authentic2.validators import email_validator
|
||||
|
||||
from .managers import UserManager, UserQuerySet
|
||||
|
@ -372,20 +372,38 @@ class User(AbstractBaseUser, PermissionMixin):
|
|||
self.deactivation = timestamp or timezone.now()
|
||||
self.save(update_fields=['is_active', 'deactivation'])
|
||||
|
||||
def mark_as_deleted(self, timestamp=None, force=True, save=True):
|
||||
self.mark_as_inactive(timestamp)
|
||||
if force or not self.deleted:
|
||||
self.deleted = timestamp or timezone.now()
|
||||
if save:
|
||||
self.save(update_fields=['deleted'])
|
||||
|
||||
def set_random_password(self):
|
||||
self.set_password(base64.b64encode(os.urandom(32)).decode('ascii'))
|
||||
|
||||
@transaction.atomic
|
||||
def delete(self, **kwargs):
|
||||
deleted_user = DeletedUser(
|
||||
old_user_id=self.id)
|
||||
if 'email' in app_settings.A2_USER_DELETED_KEEP_DATA:
|
||||
deleted_user.old_email = self.email.rsplit('#', 1)[0]
|
||||
if 'uuid' in app_settings.A2_USER_DELETED_KEEP_DATA:
|
||||
deleted_user.old_uuid = self.uuid
|
||||
|
||||
# save LDAP account references
|
||||
external_ids = self.userexternalid_set.order_by('id')
|
||||
if external_ids.exists():
|
||||
deleted_user.old_data = {'external_ids': []}
|
||||
for external_id in external_ids:
|
||||
deleted_user.old_data['external_ids'].append(
|
||||
{
|
||||
'source': external_id.source,
|
||||
'external_id': external_id.external_id,
|
||||
}
|
||||
)
|
||||
external_ids.delete()
|
||||
deleted_user.save()
|
||||
return super().delete(**kwargs)
|
||||
|
||||
|
||||
class DeletedUser(models.Model):
|
||||
deleted = models.DateTimeField(
|
||||
verbose_name=_('Deletion date'))
|
||||
verbose_name=_('Deletion date'),
|
||||
auto_now_add=True)
|
||||
old_uuid = models.TextField(
|
||||
verbose_name=_('Old UUID'),
|
||||
null=True,
|
||||
|
|
|
@ -135,4 +135,4 @@ class Command(BaseCommand):
|
|||
with transaction.atomic():
|
||||
self.send_mail('authentic2/unused_account_delete', user, ctx)
|
||||
if not self.fake:
|
||||
user.mark_as_deleted(timestamp=self.now)
|
||||
user.delete()
|
||||
|
|
|
@ -23,7 +23,7 @@ from django.db import models
|
|||
from django.utils.functional import cached_property
|
||||
from django.utils.translation import ugettext_lazy as _, pgettext_lazy, ugettext
|
||||
from django.utils.html import format_html
|
||||
from django.urls import reverse
|
||||
from django.urls import reverse, reverse_lazy
|
||||
from django.core.exceptions import PermissionDenied
|
||||
from django.core.mail import EmailMultiAlternatives
|
||||
from django.template import loader
|
||||
|
@ -711,16 +711,14 @@ class UserDeleteView(BaseDeleteView):
|
|||
model = get_user_model()
|
||||
title = _('Delete user')
|
||||
template_name = 'authentic2/manager/user_delete.html'
|
||||
|
||||
def get_success_url(self):
|
||||
return reverse('a2-manager-users')
|
||||
success_url = reverse_lazy('a2-manager-users')
|
||||
|
||||
def delete(self, request, *args, **kwargs):
|
||||
self.get_object().mark_as_deleted()
|
||||
response = super().delete(request, *args, **kwargs)
|
||||
hooks.call_hooks('event', name='manager-delete-user', user=request.user,
|
||||
instance=self.object)
|
||||
request.journal.record('manager.user.deletion', target_user=self.object)
|
||||
return HttpResponseRedirect(self.get_success_url())
|
||||
return response
|
||||
|
||||
|
||||
user_delete = UserDeleteView.as_view()
|
||||
|
|
|
@ -1256,23 +1256,22 @@ class ValidateDeletionView(TemplateView):
|
|||
def post(self, request, *args, **kwargs):
|
||||
if 'cancel' not in request.POST and not self.user.deleted:
|
||||
utils.send_account_deletion_mail(self.request, self.user)
|
||||
self.user.mark_as_deleted()
|
||||
logger.info(u'deletion of account %s performed', self.user)
|
||||
hooks.call_hooks('event', name='delete-account', user=self.user)
|
||||
request.journal.record('user.deletion', user=self.user)
|
||||
if self.user == request.user:
|
||||
# No validation message displayed, as the user will surely
|
||||
# notice their own account deletion...
|
||||
return logout(request, check_referer=False)
|
||||
is_deleted_user_logged = (self.user == request.user)
|
||||
self.user.delete()
|
||||
messages.info(request, _('Deletion performed.'))
|
||||
# No real use for cancel_url or next_url here, assuming the link
|
||||
# has been received by email. We instead redirect the user to the
|
||||
# homepage.
|
||||
messages.info(request, _('Deletion performed.'))
|
||||
if is_deleted_user_logged:
|
||||
return logout(request, check_referer=False)
|
||||
return utils.redirect(request, 'auth_homepage')
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
ctx = super(ValidateDeletionView, self).get_context_data(**kwargs)
|
||||
ctx['user'] = self.user # Not necessarily the user in request
|
||||
ctx['user'] = self.user # Not necessarily the user in request
|
||||
return ctx
|
||||
|
||||
|
||||
|
|
|
@ -387,10 +387,9 @@ def test_save_account_on_delete_user(db):
|
|||
user = User.objects.create()
|
||||
models.FcAccount.objects.create(user=user, sub='1234')
|
||||
models.FcAccount.objects.create(user=user, sub='4567', order=1)
|
||||
|
||||
user.mark_as_deleted()
|
||||
User.objects.cleanup(threshold=0, timestamp=now() + datetime.timedelta(seconds=1))
|
||||
user.delete()
|
||||
assert models.FcAccount.objects.count() == 0
|
||||
|
||||
deleted_user = DeletedUser.objects.get()
|
||||
assert deleted_user.old_data.get('fc_accounts') == [
|
||||
{
|
||||
|
|
|
@ -361,7 +361,8 @@ def test_api_users_list_search_text(app, superuser):
|
|||
results = resp.json['results']
|
||||
assert len(results) == 1
|
||||
assert results[0]['username'] == 'someuser'
|
||||
someuser.mark_as_deleted()
|
||||
someuser.delete()
|
||||
|
||||
resp = app.get('/api/users/?q=some')
|
||||
results = resp.json['results']
|
||||
assert not len(results)
|
||||
|
@ -1463,7 +1464,7 @@ def test_api_users_get_or_create(settings, app, admin):
|
|||
assert User.objects.get(id=id).check_password('secret')
|
||||
|
||||
# do not get deleted user, create a new one
|
||||
User.objects.get(id=id).mark_as_deleted()
|
||||
User.objects.get(id=id).delete()
|
||||
payload['last_name'] = 'Doe'
|
||||
resp = app.post_json('/api/users/?get_or_create=email', params=payload, status=201)
|
||||
assert id != resp.json['id']
|
||||
|
@ -1960,10 +1961,10 @@ def test_phone_normalization_nok(settings, app, admin):
|
|||
app.post_json('/api/users/', headers=headers, params=payload, status=400)
|
||||
|
||||
|
||||
def test_api_users_mark_as_deleted(app, settings, admin):
|
||||
def test_api_users_create_user_delete(app, settings, admin):
|
||||
email='foo@example.net'
|
||||
user1 = User.objects.create(username='foo', email=email)
|
||||
user1.mark_as_deleted()
|
||||
user1.delete()
|
||||
user2 = User.objects.create(username='foo2', email=email)
|
||||
|
||||
app.authorization = ('Basic', (admin.username, admin.username))
|
||||
|
@ -1982,7 +1983,7 @@ def test_api_users_mark_as_deleted(app, settings, admin):
|
|||
resp = app.get('/api/users/?email={}'.format(email))
|
||||
assert len(resp.json['results']) == 2
|
||||
|
||||
user2.mark_as_deleted()
|
||||
user2.delete()
|
||||
resp = app.get('/api/users/?email={}'.format(email))
|
||||
assert len(resp.json['results']) == 1
|
||||
|
||||
|
@ -1992,7 +1993,7 @@ def test_api_users_mark_as_deleted(app, settings, admin):
|
|||
assert resp.json['errors'] == {'email': ['email already used']}
|
||||
|
||||
|
||||
def test_api_register_mark_as_deleted(app, settings, admin):
|
||||
def test_api_register_user_delete(app, settings, admin):
|
||||
settings.A2_EMAIL_IS_UNIQUE = True
|
||||
|
||||
user = User.objects.create(username='foo', email='john.doe@example.com', ou=get_default_ou())
|
||||
|
@ -2012,12 +2013,12 @@ def test_api_register_mark_as_deleted(app, settings, admin):
|
|||
reverse('a2-api-register'), params=payload, headers=headers, status=400)
|
||||
assert response.json['errors'] == {'__all__': ['Account already exists']}
|
||||
|
||||
user.mark_as_deleted()
|
||||
user.delete()
|
||||
response = app.post_json(
|
||||
reverse('a2-api-register'), params=payload, headers=headers, status=201)
|
||||
|
||||
|
||||
def test_api_password_change_mark_as_deleted(app, settings, admin, ou1):
|
||||
def test_api_password_change_user_delete(app, settings, admin, ou1):
|
||||
app.authorization = ('Basic', (admin.username, admin.username))
|
||||
user1 = User.objects.create(
|
||||
username='john.doe', email='john.doe@example.com', ou=ou1)
|
||||
|
@ -2036,7 +2037,7 @@ def test_api_password_change_mark_as_deleted(app, settings, admin, ou1):
|
|||
}
|
||||
url = reverse('a2-api-password-change')
|
||||
response = app.post_json(url, params=payload, status=400)
|
||||
user2.mark_as_deleted()
|
||||
user2.delete()
|
||||
response = app.post_json(url, params=payload)
|
||||
assert User.objects.get(username='john.doe').check_password('password2')
|
||||
|
||||
|
|
|
@ -855,9 +855,9 @@ def test_save_account_on_delete_user(db):
|
|||
user = User.objects.create()
|
||||
OIDCAccount.objects.create(user=user, provider=provider, sub='1234')
|
||||
|
||||
user.mark_as_deleted()
|
||||
User.objects.cleanup(threshold=0, timestamp=now() + datetime.timedelta(seconds=1))
|
||||
user.delete()
|
||||
assert OIDCAccount.objects.count() == 0
|
||||
|
||||
deleted_user = DeletedUser.objects.get()
|
||||
assert deleted_user.old_data.get('oidc_accounts') == [
|
||||
{
|
||||
|
|
|
@ -275,9 +275,9 @@ def test_save_account_on_delete_user(db):
|
|||
UserSAMLIdentifier.objects.create(user=user, issuer='https://idp1.com/', name_id='1234')
|
||||
UserSAMLIdentifier.objects.create(user=user, issuer='https://idp2.com/', name_id='4567')
|
||||
|
||||
user.mark_as_deleted()
|
||||
User.objects.cleanup(threshold=0, timestamp=now() + datetime.timedelta(seconds=1))
|
||||
user.delete()
|
||||
assert UserSAMLIdentifier.objects.count() == 0
|
||||
|
||||
deleted_user = DeletedUser.objects.get()
|
||||
assert deleted_user.old_data.get('saml_accounts') == [
|
||||
{
|
||||
|
|
|
@ -1,43 +0,0 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2019 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import datetime
|
||||
|
||||
from django.utils.timezone import now
|
||||
|
||||
from authentic2.custom_user.models import User, DeletedUser
|
||||
|
||||
|
||||
def test_deleted_user_cleanup(db, freezer):
|
||||
freezer.move_to('2020-01-01')
|
||||
u = User.objects.create(username='john.doe', email='john@example.com')
|
||||
assert User.objects.count() == 1
|
||||
assert DeletedUser.objects.count() == 0
|
||||
u.mark_as_deleted()
|
||||
User.objects.cleanup(timestamp=now() + datetime.timedelta(seconds=700))
|
||||
assert User.objects.count() == 0
|
||||
assert DeletedUser.objects.count() == 1
|
||||
deleted_user = DeletedUser.objects.get(old_user_id=u.id)
|
||||
assert deleted_user.deleted == u.deleted
|
||||
assert deleted_user.old_email == u.email.rsplit('#', 1)[0]
|
||||
assert deleted_user.old_uuid == u.uuid
|
||||
assert deleted_user.old_data is None
|
||||
freezer.move_to(datetime.timedelta(days=365))
|
||||
DeletedUser.cleanup()
|
||||
assert DeletedUser.objects.count() == 1, 'DeletedUser are deleted after 365 days'
|
||||
freezer.move_to(datetime.timedelta(seconds=1))
|
||||
DeletedUser.cleanup()
|
||||
assert DeletedUser.objects.count() == 0, 'DeletedUser are deleted after 365 days'
|
|
@ -28,6 +28,7 @@ import py
|
|||
from authentic2.a2_rbac.models import MANAGE_MEMBERS_OP, VIEW_OP
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.models import UserExternalId
|
||||
from authentic2.custom_user.models import DeletedUser
|
||||
from authentic2_auth_oidc.models import OIDCProvider, OIDCAccount
|
||||
from django_rbac.models import ADMIN_OP
|
||||
from django_rbac.models import Operation
|
||||
|
@ -87,26 +88,20 @@ def test_clean_unused_account(db, simple_user, mailoutbox, freezer, settings):
|
|||
|
||||
call_command('clean-unused-accounts')
|
||||
|
||||
for user in (simple_user, ldap_user, oidc_user):
|
||||
user.refresh_from_db()
|
||||
assert not simple_user.deleted
|
||||
assert User.objects.count() == 3
|
||||
assert len(mailoutbox) == 1
|
||||
|
||||
freezer.move_to('2018-01-01 12:00:00')
|
||||
# no new mail, no deletion
|
||||
call_command('clean-unused-accounts')
|
||||
for user in (simple_user, ldap_user, oidc_user):
|
||||
user.refresh_from_db()
|
||||
assert not simple_user.deleted
|
||||
assert User.objects.count() == 3
|
||||
assert len(mailoutbox) == 1
|
||||
|
||||
freezer.move_to('2018-01-02')
|
||||
call_command('clean-unused-accounts')
|
||||
for user in (ldap_user, oidc_user):
|
||||
user.refresh_from_db()
|
||||
assert not simple_user.deleted
|
||||
simple_user.refresh_from_db()
|
||||
assert simple_user.deleted
|
||||
assert User.objects.count() == 2
|
||||
deleted_user = DeletedUser.objects.get()
|
||||
assert deleted_user.old_user_id == simple_user.id
|
||||
assert len(mailoutbox) == 2
|
||||
assert mailoutbox[-1].to == [email]
|
||||
|
||||
|
|
|
@ -19,13 +19,13 @@ from datetime import date
|
|||
from django.contrib.auth import get_user_model
|
||||
|
||||
from authentic2.models import Attribute
|
||||
from authentic2.custom_user.models import User, DeletedUser
|
||||
from django_rbac.utils import get_permission_model, get_role_model
|
||||
|
||||
import pytest
|
||||
|
||||
Permission = get_permission_model()
|
||||
Role = get_role_model()
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
def test_roles_and_parents(db):
|
||||
|
@ -58,12 +58,22 @@ def test_roles_and_parents(db):
|
|||
assert r.member == []
|
||||
|
||||
|
||||
def test_user_mark_as_deleted(db):
|
||||
user1 = User.objects.create(username='foo', email='foo@example.net')
|
||||
user1.mark_as_deleted()
|
||||
def test_user_delete(db):
|
||||
user = User.objects.create(username='foo', email='foo@example.net')
|
||||
user_id = user.id
|
||||
user_uuid = user.uuid
|
||||
user.delete()
|
||||
|
||||
User.objects.create(username='foo2', email='foo@example.net')
|
||||
assert len(User.objects.filter(email='foo@example.net')) == 2
|
||||
assert len(User.objects.filter(email='foo@example.net', deleted__isnull=True)) == 1
|
||||
user = User.objects.get()
|
||||
assert user.id != user_id
|
||||
assert user.username == 'foo2'
|
||||
|
||||
deleted_user = DeletedUser.objects.get()
|
||||
assert deleted_user.old_user_id == user_id
|
||||
assert deleted_user.old_uuid == user_uuid
|
||||
assert deleted_user.old_email == 'foo@example.net'
|
||||
assert User.objects.filter(email='foo@example.net', deleted__isnull=False).count() == 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
|
|
@ -268,8 +268,8 @@ def test_manager_create_user(superuser_or_admin, app, settings):
|
|||
|
||||
# first user with john.doe@gmail.com/ou2 marked as deleted
|
||||
john = User.objects.get(email='john.doe@gmail.com', ou=ou2)
|
||||
john.mark_as_deleted()
|
||||
john.save()
|
||||
john.delete()
|
||||
|
||||
ou_add = app.get(url)
|
||||
form = ou_add.form
|
||||
form.set('first_name', 'John')
|
||||
|
@ -278,8 +278,7 @@ def test_manager_create_user(superuser_or_admin, app, settings):
|
|||
form.set('password1', 'ABcd1234')
|
||||
form.set('password2', 'ABcd1234')
|
||||
response = form.submit()
|
||||
assert User.objects.filter(ou=ou2).count() == 2
|
||||
assert User.objects.filter(ou=ou2, deleted__isnull=True).count() == 1
|
||||
assert User.objects.filter(ou=ou2).count() == 1
|
||||
|
||||
# create first user with john.doe2@gmail.com ou OU2 : OK
|
||||
ou_add = app.get(url)
|
||||
|
@ -290,7 +289,6 @@ def test_manager_create_user(superuser_or_admin, app, settings):
|
|||
form.set('password1', 'ABcd1234')
|
||||
form.set('password2', 'ABcd1234')
|
||||
response = form.submit().follow()
|
||||
assert User.objects.filter(ou=ou2, deleted__isnull=True).count() == 2
|
||||
|
||||
# try to change user email from john.doe2@gmail.com to
|
||||
# john.doe@gmail.com in OU2 : NOK
|
||||
|
|
|
@ -307,7 +307,7 @@ def test_search_by_attribute(app, simple_user, admin):
|
|||
# now we see only simple_user
|
||||
assert visible_users(response) == {simple_user.username}
|
||||
|
||||
simple_user.mark_as_deleted()
|
||||
simple_user.delete()
|
||||
response.form['search-text'] = 'avenue'
|
||||
response = response.form.submit()
|
||||
|
||||
|
@ -371,13 +371,13 @@ def test_export_csv_disabled_attribute(settings, app, superuser):
|
|||
assert len(line) == num_col
|
||||
|
||||
|
||||
def test_export_csv_mark_as_deleted(settings, app, superuser):
|
||||
def test_export_csv_user_delete(settings, app, superuser):
|
||||
for i in range(10):
|
||||
User.objects.create(username='user-%s' % i)
|
||||
|
||||
# users marked as deleted should not show up
|
||||
for user in User.objects.all()[0:3]:
|
||||
user.mark_as_deleted()
|
||||
user.delete()
|
||||
|
||||
response = login(app, superuser, reverse('a2-manager-users'))
|
||||
settings.A2_CACHE_ENABLED = True
|
||||
|
@ -429,7 +429,7 @@ def test_user_import(encoding, transactional_db, app, admin, ou1, admin_ou1):
|
|||
deleted_user = User.objects.create(
|
||||
email='john.doe@entrouvert.com', username='jdoe',
|
||||
first_name='John', last_name='doe')
|
||||
deleted_user.mark_as_deleted()
|
||||
deleted_user.delete()
|
||||
|
||||
user_count = User.objects.count()
|
||||
|
||||
|
@ -653,17 +653,18 @@ def test_detail_view(app, admin, simple_user):
|
|||
login(app, admin, url)
|
||||
|
||||
|
||||
def test_detail_view_deleted(app, admin, simple_user):
|
||||
def test_detail_view_user_deleted(app, admin, simple_user):
|
||||
url = '/manage/users/{user.pk}/'.format(user=simple_user)
|
||||
login(app, admin, url)
|
||||
simple_user.mark_as_deleted()
|
||||
simple_user.delete()
|
||||
app.get(url, status=404)
|
||||
|
||||
|
||||
def test_user_table_mark_as_deleted(app, admin, user_ou1, ou1):
|
||||
def test_user_table_user_deleted(app, admin, user_ou1, ou1):
|
||||
response = login(app, admin, '/manage/users/')
|
||||
assert len(response.pyquery('table.main tbody tr')) == 2
|
||||
user_ou1.mark_as_deleted()
|
||||
|
||||
user_ou1.delete()
|
||||
response = app.get('/manage/users/')
|
||||
assert len(response.pyquery('table.main tbody tr')) == 1
|
||||
|
||||
|
|
|
@ -244,9 +244,9 @@ def test_save_userexternalid_on_delete_user(db):
|
|||
UserExternalId.objects.create(user=user, source='ldap1', external_id='1234')
|
||||
UserExternalId.objects.create(user=user, source='ldap2', external_id='4567')
|
||||
|
||||
user.mark_as_deleted()
|
||||
User.objects.cleanup(threshold=0, timestamp=now() + datetime.timedelta(seconds=1))
|
||||
user.delete()
|
||||
assert UserExternalId.objects.count() == 0
|
||||
|
||||
deleted_user = DeletedUser.objects.get()
|
||||
assert deleted_user.old_data.get('external_ids') == [
|
||||
{
|
||||
|
|
|
@ -23,7 +23,7 @@ from django.urls import reverse
|
|||
from django.utils.html import escape
|
||||
from django.utils.six.moves.urllib.parse import urlparse
|
||||
|
||||
from authentic2.custom_user.models import User
|
||||
from authentic2.custom_user.models import User, DeletedUser
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
||||
|
@ -68,18 +68,13 @@ def test_account_delete(app, simple_user, mailoutbox):
|
|||
# confirmation under tests
|
||||
response = page.form.submit(name='delete')
|
||||
assert '_auth_user_id' not in app.session
|
||||
email = simple_user.email
|
||||
simple_user.refresh_from_db()
|
||||
assert not simple_user.is_active
|
||||
assert simple_user.deleted
|
||||
# no de-activation suffix on the address anymore:
|
||||
assert simple_user.email == email
|
||||
assert User.objects.filter(id=simple_user.id).count() == 0
|
||||
assert DeletedUser.objects.filter(old_user_id=simple_user.id).count() == 1
|
||||
assert len(mailoutbox) == 2
|
||||
assert 'Account deletion on testserver' == mailoutbox[1].subject
|
||||
assert mailoutbox[0].to == [email]
|
||||
assert mailoutbox[0].to == [simple_user.email]
|
||||
assert "Deletion performed" in str(response) # Set-Cookie: messages..
|
||||
assert urlparse(response.location).path == '/'
|
||||
response = response.follow().follow()
|
||||
assert response.request.url.endswith('/login/?next=/')
|
||||
|
||||
|
||||
def test_account_delete_when_logged_out(app, simple_user, mailoutbox):
|
||||
|
@ -93,12 +88,14 @@ def test_account_delete_when_logged_out(app, simple_user, mailoutbox):
|
|||
page = app.get(link)
|
||||
assert 'You are about to delete the account of <strong>%s</strong>.' % escape(
|
||||
simple_user.get_full_name()) in page.text
|
||||
response = page.form.submit(name='delete').follow().follow()
|
||||
assert not User.objects.get(pk=simple_user.pk).is_active
|
||||
response = page.form.submit(name='delete')
|
||||
assert User.objects.filter(id=simple_user.id).count() == 0
|
||||
assert DeletedUser.objects.filter(old_user_id=simple_user.id).count() == 1
|
||||
assert len(mailoutbox) == 2
|
||||
assert 'Account deletion on testserver' == mailoutbox[1].subject
|
||||
assert [simple_user.email] == mailoutbox[0].to
|
||||
assert "Deletion performed" in response.text
|
||||
assert mailoutbox[0].to == [simple_user.email]
|
||||
assert "Deletion performed" in str(response) # Set-Cookie: messages..
|
||||
assert urlparse(response.location).path == '/'
|
||||
|
||||
|
||||
def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox):
|
||||
|
@ -114,13 +111,15 @@ def test_account_delete_by_other_user(app, simple_user, user_ou1, mailoutbox):
|
|||
page = app.get(link)
|
||||
assert 'You are about to delete the account of <strong>%s</strong>.' % escape(
|
||||
simple_user.get_full_name()) in page.text
|
||||
response = page.form.submit(name='delete').follow()
|
||||
assert not User.objects.get(pk=simple_user.pk).is_active
|
||||
assert User.objects.get(pk=user_ou1.pk).is_active
|
||||
assert "Deletion performed" in response.text
|
||||
response = page.form.submit(name='delete')
|
||||
assert app.session['_auth_user_id'] == str(user_ou1.id)
|
||||
assert User.objects.filter(id=simple_user.id).count() == 0
|
||||
assert DeletedUser.objects.filter(old_user_id=simple_user.id).count() == 1
|
||||
assert len(mailoutbox) == 2
|
||||
assert 'Account deletion on testserver' == mailoutbox[1].subject
|
||||
assert [simple_user.email] == mailoutbox[0].to
|
||||
assert mailoutbox[0].to == [simple_user.email]
|
||||
assert "Deletion performed" in str(response) # Set-Cookie: messages..
|
||||
assert urlparse(response.location).path == '/'
|
||||
|
||||
|
||||
def test_account_delete_fake_token(app, simple_user, mailoutbox):
|
||||
|
|
Loading…
Reference in New Issue