misc: do not modify email when marking users as deleted (#48264)

This commit is contained in:
Paul Marillonnet 2020-11-05 15:18:31 +01:00
parent c1345a3356
commit 8c3902b2c2
18 changed files with 242 additions and 53 deletions

View File

@ -157,17 +157,47 @@ class RegistrationSerializer(serializers.Serializer):
'this ou'))
User = get_user_model()
if ou:
if ou.email_is_unique and \
User.objects.filter(ou=ou, email__iexact=data['email']).exists():
raise serializers.ValidationError(
_('You already have an account'))
if (ou.username_is_unique
and 'username' not in data):
raise serializers.ValidationError(
_('Username is required in this ou'))
if ou.username_is_unique and User.objects.filter(
ou=data['ou'], username=data['username']).exists():
raise serializers.ValidationError(_('You already have an account'))
if (app_settings.A2_EMAIL_IS_UNIQUE or
app_settings.A2_REGISTRATION_EMAIL_IS_UNIQUE):
if 'email' not in data:
raise serializers.ValidationError(
_('Email is required'))
if User.objects.filter(
email__iexact=data['email'],
deleted__isnull=True).exists():
raise serializers.ValidationError(
_('Account already exists'))
if ou.email_is_unique:
if 'email' not in data:
raise serializers.ValidationError(
_('Email is required in this ou'))
if User.objects.filter(
ou=ou, email__iexact=data['email'],
deleted__isnull=True).exists():
raise serializers.ValidationError(
_('Account already exists in this ou'))
if (app_settings.A2_USERNAME_IS_UNIQUE or
app_settings.A2_REGISTRATION_USERNAME_IS_UNIQUE):
if 'username' not in data:
raise serializers.ValidationError(
_('Username is required'))
if User.objects.filter(
username=data['username'],
deleted__isnull=True).exists():
raise serializers.ValidationError(
_('Account already exists'))
if ou.username_is_unique:
if 'username' not in data:
raise serializers.ValidationError(
_('Username is required in this ou'))
if User.objects.filter(
ou=ou, username=data['username'],
deleted__isnull=True).exists():
raise serializers.ValidationError(
_('Account already exists in this ou'))
return data
@ -310,7 +340,7 @@ class PasswordChangeSerializer(serializers.Serializer):
def validate(self, data):
User = get_user_model()
qs = User.objects.filter(email=data['email'])
qs = User.objects.filter(email=data['email'], deleted__isnull=True)
if data['ou']:
qs = qs.filter(ou=data['ou'])
try:
@ -495,9 +525,11 @@ class BaseUserSerializer(serializers.ModelSerializer):
and 'email' not in update_or_create_fields
and data.get('email')
and (not self.instance or data.get('email') != self.instance.email)):
if app_settings.A2_EMAIL_IS_UNIQUE and qs.filter(email=data['email']).exists():
if app_settings.A2_EMAIL_IS_UNIQUE and qs.filter(
email=data['email'], deleted__isnull=True).exists():
already_used = True
if ou and ou.email_is_unique and qs.filter(ou=ou, email=data['email']).exists():
if ou and ou.email_is_unique and qs.filter(
ou=ou, email=data['email'], deleted__isnull=True).exists():
already_used = True
errors = {}
@ -719,7 +751,7 @@ class UsersAPI(api_mixins.GetOrCreateMixinView, HookMixin, ExceptionHandlerMixin
def get_queryset(self):
User = get_user_model()
qs = User.objects.all()
qs = User.objects.filter(deleted__isnull=True)
if self.request.method == 'GET':
qs = qs.prefetch_related('attribute_values', 'attribute_values__attribute')
qs = self.request.user.filter_by_perm(['custom_user.view_user'], qs)

View File

@ -144,7 +144,7 @@ default_settings = dict(
definition='Email of users must be unique'),
A2_REGISTRATION_EMAIL_IS_UNIQUE=Setting(
default=False,
definition='Email of registererd accounts must be unique'),
definition='Email of registered accounts must be unique'),
A2_REGISTRATION_FORM_USERNAME_REGEX=Setting(
default=r'^[\w.@+-]+$',
definition='Regex to validate usernames'),

View File

@ -116,7 +116,7 @@ You can use <tt>event:login</tt> to find all events of type <tt>login</tt>.'''
return models.EventQuerySet._which_references_query(users)
def search_by_email(self, email):
users = User.objects.filter(email__iexact=email.lower())
users = User.objects.filter(email__iexact=email.lower(), deleted__isnull=True)
yield (self.query_for_users(users) | Q(data__email__iexact=email.lower()))
search_by_event.documentation = _(

View File

@ -46,7 +46,8 @@ class ModelBackend(ModelBackend):
try:
if app_settings.A2_ACCEPT_EMAIL_AUTHENTICATION \
and UserModel._meta.get_field('email'):
queries.append(models.Q(**{'email__iexact': username}))
queries.append(models.Q(**{
'email__iexact': username, 'deleted__isnull': True}))
except models.FieldDoesNotExist:
pass

View File

@ -643,10 +643,11 @@ class UserCsvImporter(object):
key_value = row[header_key].value
if header_key.field:
users = User.objects.filter(
**{header_key.name: key_value})
**{header_key.name: key_value}, deleted__isnull=True)
elif header_key.attribute:
atvs = AttributeValue.objects.filter(attribute__name=header_key.name, content=key_value)
users = User.objects.filter(attribute_values__in=atvs)
users = User.objects.filter(
attribute_values__in=atvs, deleted__isnull=True)
users = users[:2]
if users:

View File

@ -292,7 +292,7 @@ class User(AbstractBaseUser, PermissionMixin):
email_qs = qs.filter(ou=self.ou)
try:
try:
email_qs.get(email__iexact=self.email)
email_qs.get(email__iexact=self.email, deleted__isnull=True)
except MultipleObjectsReturned:
pass
except model.DoesNotExist:
@ -363,13 +363,10 @@ class User(AbstractBaseUser, PermissionMixin):
def mark_as_deleted(self, timestamp=None, force=True, save=True):
self.is_active = False
self.email_verified = False
if self.email and '#' not in self.email:
self.email += '#%d' % random.randint(1, 10000000)
if force or not self.deleted:
self.deleted = timestamp or timezone.now()
if save:
self.save(update_fields=['email', 'email_verified', 'is_active', 'deleted'])
self.save(update_fields=['is_active', 'deleted'])
class DeletedUser(models.Model):

View File

@ -45,7 +45,7 @@ class PasswordResetForm(forms.Form):
"""
email = self.cleaned_data["email"].strip()
users = get_user_queryset()
active_users = users.filter(email__iexact=email, is_active=True)
active_users = users.filter(email__iexact=email, deleted__isnull=True)
for user in active_users:
# we don't set the password to a random string, as some users should not have
# a password

View File

@ -95,7 +95,7 @@ class RegistrationCompletionFormNoPassword(profile_forms.BaseUserForm):
if app_settings.A2_REGISTRATION_EMAIL_IS_UNIQUE:
exist = False
try:
User.objects.get(email__iexact=email)
User.objects.get(email__iexact=email, deleted__isnull=True)
except User.DoesNotExist:
pass
except User.MultipleObjectsReturned:

View File

@ -394,7 +394,7 @@ class Command(BaseCommand):
if not emails:
return
users = qs.annotate(iemail=Lower('email'))
count = users.filter(iemail__in=emails).count()
count = users.filter(iemail__in=emails, deleted__isnull=True).count()
self.warning('%s, found %%d user accounts with same email:' % msg, count)
for email in emails:
self.notice('- %s :', email)

View File

@ -732,7 +732,7 @@ def send_registration_mail(request, email, ou, template_names=None, next_url=Non
**kwargs)
# existing accounts
existing_accounts = User.objects.filter(email=email)
existing_accounts = User.objects.filter(email=email, deleted__isnull=True)
if not app_settings.A2_EMAIL_IS_UNIQUE:
existing_accounts = existing_accounts.filter(ou=ou, email=email)

View File

@ -209,10 +209,12 @@ class EmailChangeVerifyView(TemplateView):
user = User.objects.get(pk=user_pk)
non_unique = False
if app_settings.A2_EMAIL_IS_UNIQUE:
non_unique = User.objects.filter(email=email).exclude(pk=user_pk).exists()
non_unique = User.objects.filter(
email=email, deleted__isnull=True).exclude(pk=user_pk).exists()
elif user.ou and user.ou.email_is_unique:
non_unique = User.objects.filter(email=email, ou=user.ou).exclude(
pk=user_pk).exists()
non_unique = User.objects.filter(
email=email, ou=user.ou, deleted__isnull=True).exclude(
pk=user_pk).exists()
if non_unique:
raise ValidationError(_('This email is already used by another account.'))
old_email = user.email
@ -935,7 +937,7 @@ class RegistrationCompletionView(CreateView):
self.ou = OU.objects.get(pk=self.token['ou'])
else:
self.ou = get_default_ou()
self.users = User.objects.filter(email__iexact=self.email) \
self.users = User.objects.filter(email__iexact=self.email, deleted__isnull=True) \
.order_by('date_joined')
if self.ou:
self.users = self.users.filter(ou=self.ou)

View File

@ -414,7 +414,7 @@ class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
if not user and self.user_info.get('email') and email_is_unique:
email = self.user_info['email']
User = get_user_model()
qs = User.objects.filter(email__iexact=email)
qs = User.objects.filter(email__iexact=email, deleted__isnull=True)
if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique:
qs = qs.filter(ou=default_ou)

View File

@ -523,7 +523,7 @@ class APITest(TestCase):
content_type='application/json',
data=json.dumps(payload))
self.assertEqual(response.data['errors']['__all__'],
[_('You already have an account')])
[_('Account already exists in this ou')])
# Username is required
payload = {
'email': '1' + email,
@ -535,7 +535,7 @@ class APITest(TestCase):
content_type='application/json',
data=json.dumps(payload))
self.assertEqual(response.data['errors']['__all__'],
[_('Username is required in this ou')])
[_('Username is required')])
# Test username is unique
payload = {
'email': '1' + email,
@ -548,7 +548,7 @@ class APITest(TestCase):
content_type='application/json',
data=json.dumps(payload))
self.assertEqual(response.data['errors']['__all__'],
[_('You already have an account')])
[_('Account already exists')])
def test_register_reguser2_wrong_ou(self):
client = test.APIClient()
@ -637,7 +637,7 @@ class APITest(TestCase):
content_type='application/json',
data=json.dumps(payload))
self.assertEqual(response.data['errors']['__all__'],
[_('You already have an account')])
[_('Account already exists in this ou')])
# Username is required
payload = {
'email': '1' + email,
@ -649,7 +649,7 @@ class APITest(TestCase):
content_type='application/json',
data=json.dumps(payload))
self.assertEqual(response.data['errors']['__all__'],
[_('Username is required in this ou')])
[_('Username is required')])
# Test username is unique
payload = {
'email': '1' + email,
@ -662,7 +662,7 @@ class APITest(TestCase):
content_type='application/json',
data=json.dumps(payload))
self.assertEqual(response.data['errors']['__all__'],
[_('You already have an account')])
[_('Account already exists')])
@override_settings(A2_REQUIRED_FIELDS=['username'])
def test_email_username_is_unique_double_registration(self):

View File

@ -353,11 +353,16 @@ def test_api_users_list_by_authorized_service(app, superuser):
def test_api_users_list_search_text(app, superuser):
app.authorization = ('Basic', (superuser.username, superuser.username))
User.objects.create(username='someuser')
User = get_user_model()
someuser = User.objects.create(username='someuser')
resp = app.get('/api/users/?q=some')
results = resp.json['results']
assert len(results) == 1
assert results[0]['username'] == 'someuser'
someuser.mark_as_deleted()
resp = app.get('/api/users/?q=some')
results = resp.json['results']
assert not len(results)
def test_api_users_create(settings, app, api_user):
@ -856,7 +861,10 @@ def test_api_role_members_wrong_payload_types(app, superuser, role_random, membe
assert resp.json['errors'] == ["List elements of the 'data' dict entry must be dictionaries"]
def test_register_no_email_validation(app, admin, django_user_model):
def test_register_no_email_validation(settings, app, admin, django_user_model):
settings.A2_USERNAME_IS_UNIQUE = False
settings.A2_REGISTRATION_USERNAME_IS_UNIQUE = False
User = django_user_model
password = '12XYab'
username = 'john.doe'
email = 'john.doe@example.com'
@ -911,7 +919,10 @@ def test_register_no_email_validation(app, admin, django_user_model):
assert user.check_password(password)
def test_register_ou_no_email_validation(app, admin, django_user_model):
def test_register_ou_no_email_validation(settings, app, admin, django_user_model):
settings.A2_USERNAME_IS_UNIQUE = False
settings.A2_REGISTRATION_USERNAME_IS_UNIQUE = False
User = django_user_model
password = '12XYab'
username = 'john.doe'
email = 'john.doe@example.com'
@ -1962,3 +1973,84 @@ def test_phone_normalization_nok(settings, app, admin):
payload['phone'] = '1#2'
app.post_json('/api/users/', headers=headers, params=payload, status=400)
def test_api_users_mark_as_deleted(app, settings, admin):
email='foo@example.net'
user1 = User.objects.create(username='foo', email=email)
user1.mark_as_deleted()
user2 = User.objects.create(username='foo2', email=email)
app.authorization = ('Basic', (admin.username, admin.username))
resp = app.get('/api/users/?email={}'.format(email))
assert len(resp.json['results']) == 1
payload = {
'username': 'foo3',
'email': email,
'first_name': 'John',
'last_name': 'Doe',
}
headers = basic_authorization_header(admin)
app.post_json('/api/users/', headers=headers, params=payload, status=201)
resp = app.get('/api/users/?email={}'.format(email))
assert len(resp.json['results']) == 2
user2.mark_as_deleted()
resp = app.get('/api/users/?email={}'.format(email))
assert len(resp.json['results']) == 1
settings.A2_EMAIL_IS_UNIQUE = True
# email already used
resp = app.post_json('/api/users/', headers=headers, params=payload, status=400)
assert resp.json['errors'] == {'email': ['email already used']}
def test_api_register_mark_as_deleted(app, settings, admin):
settings.A2_EMAIL_IS_UNIQUE = True
user = User.objects.create(username='foo', email='john.doe@example.com', ou=get_default_ou())
headers = basic_authorization_header(admin)
payload = {
'username': 'john.doe',
'email': 'john.doe@example.com',
'first_name': 'John',
'last_name': 'Doe',
'password': '12XYab',
'no_email_validation': True,
'return_url': 'http://sp.example.com/validate/',
'ou': 'default',
}
response = app.post_json(
reverse('a2-api-register'), params=payload, headers=headers, status=400)
assert response.json['errors'] == {'__all__': ['Account already exists']}
user.mark_as_deleted()
response = app.post_json(
reverse('a2-api-register'), params=payload, headers=headers, status=201)
def test_api_password_change_mark_as_deleted(app, settings, admin, ou1):
app.authorization = ('Basic', (admin.username, admin.username))
user1 = User.objects.create(
username='john.doe', email='john.doe@example.com', ou=ou1)
user1.set_password('password')
user1.save()
user2 = User.objects.create(
username='john.doe2', email='john.doe@example.com', ou=ou1)
user2.set_password('password')
user1.save()
payload = {
'email': 'john.doe@example.com',
'ou': ou1.slug,
'old_password': 'password',
'new_password': 'password2',
}
url = reverse('a2-api-password-change')
response = app.post_json(url, params=payload, status=400)
user2.mark_as_deleted()
response = app.post_json(url, params=payload)
assert User.objects.get(username='john.doe').check_password('password2')

View File

@ -58,3 +58,10 @@ class CustomUserTestCase(TestCase):
self.assertIn(r.id, [rparent1.id, rparent2.id])
self.assertEqual(r.member, [])
def test_user_mark_as_deleted(self):
user1 = User.objects.create(username='foo', email='foo@example.net')
user1.mark_as_deleted()
user2 = User.objects.create(username='foo2', email='foo@example.net')
assert len(User.objects.filter(email='foo@example.net')) == 2
assert len(User.objects.filter(
email='foo@example.net', deleted__isnull=True)) == 1

View File

@ -266,6 +266,21 @@ def test_manager_create_user(superuser_or_admin, app, settings):
assert User.objects.filter(ou=ou2).count() == 1
assert 'This email address is already in use.' in response
# first user with john.doe@gmail.com/ou2 marked as deleted
john = User.objects.get(email='john.doe@gmail.com', ou=ou2)
john.mark_as_deleted()
john.save()
ou_add = app.get(url)
form = ou_add.form
form.set('first_name', 'John')
form.set('last_name', 'Doe')
form.set('email', 'john.doe@gmail.com')
form.set('password1', 'ABcd1234')
form.set('password2', 'ABcd1234')
response = form.submit()
assert User.objects.filter(ou=ou2).count() == 2
assert User.objects.filter(ou=ou2, deleted__isnull=True).count() == 1
# create first user with john.doe2@gmail.com ou OU2 : OK
ou_add = app.get(url)
form = ou_add.form
@ -275,7 +290,7 @@ def test_manager_create_user(superuser_or_admin, app, settings):
form.set('password1', 'ABcd1234')
form.set('password2', 'ABcd1234')
response = form.submit().follow()
assert User.objects.filter(ou=ou2).count() == 2
assert User.objects.filter(ou=ou2, deleted__isnull=True).count() == 2
# try to change user email from john.doe2@gmail.com to
# john.doe@gmail.com in OU2 : NOK

View File

@ -270,6 +270,12 @@ def test_search_by_attribute(app, simple_user, admin):
# now we see only simple_user
assert visible_users(response) == {simple_user.username}
simple_user.mark_as_deleted()
response.form['search-text'] = 'avenue'
response = response.form.submit()
assert visible_users(response) == set()
def test_export_csv(settings, app, superuser, django_assert_num_queries):
AT_COUNT = 30
@ -328,6 +334,22 @@ def test_export_csv_disabled_attribute(settings, app, superuser):
assert len(line) == num_col
def test_export_csv_mark_as_deleted(settings, app, superuser):
for i in range(10):
User.objects.create(username='user-%s' % i)
# users marked as deleted should not show up
for user in User.objects.all()[0:3]:
user.mark_as_deleted()
response = login(app, superuser, reverse('a2-manager-users'))
settings.A2_CACHE_ENABLED = True
response = response.click('CSV')
table = list(csv.reader(response.text.splitlines()))
# superuser + ten created users + csv header - three users marked as deteled
assert len(table) == (1 + 10 + 1 - 3)
def test_user_table(app, admin, user_ou1, ou1):
from authentic2.manager.utils import has_show_username
@ -367,6 +389,11 @@ def test_user_table(app, admin, user_ou1, ou1):
def test_user_import(encoding, transactional_db, app, admin, ou1, admin_ou1):
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
deleted_user = User.objects.create(
email='john.doe@entrouvert.com', username='jdoe',
first_name='John', last_name='doe')
deleted_user.mark_as_deleted()
user_count = User.objects.count()
assert Attribute.objects.count() == 3
@ -377,9 +404,10 @@ def test_user_import(encoding, transactional_db, app, admin, ou1, admin_ou1):
response.form.set('import_file',
Upload(
'users.csv',
u'''email key verified,first_name,last_name,phone
'''email key verified,first_name,last_name,phone
tnoel@entrouvert.com,Thomas,Noël,1234
fpeters@entrouvert.com,Frédéric,Péters,5678
john.doe@entrouvert.com,John,Doe,9101112
x,x,x,x'''.encode(encoding),
'application/octet-stream'))
response.form.set('encoding', encoding)
@ -425,8 +453,8 @@ x,x,x,x'''.encode(encoding),
response = response.click(href=simulate.uuid)
assert len(response.pyquery('table.main tbody tr')) == 4
assert len(response.pyquery('table.main tbody tr.row-valid')) == 2
assert len(response.pyquery('table.main tbody tr')) == 5
assert len(response.pyquery('table.main tbody tr.row-valid')) == 3
assert len(response.pyquery('table.main tbody tr.row-invalid')) == 2
assert len(response.pyquery('tr.row-errors')) == 0
@ -446,17 +474,22 @@ x,x,x,x'''.encode(encoding),
response = response.follow()
response = assert_timeout(2, wait_finished)
assert User.objects.count() == user_count + 2
assert User.objects.count() == user_count + 3
assert User.objects.filter(
email='tnoel@entrouvert.com',
first_name=u'Thomas',
last_name=u'Noël',
first_name='Thomas',
last_name='Noël',
attribute_values__content='1234').count() == 1
assert User.objects.filter(
email='fpeters@entrouvert.com',
first_name=u'Frédéric',
last_name=u'Péters',
first_name='Frédéric',
last_name='Péters',
attribute_values__content='5678').count() == 1
assert User.objects.filter(
email='john.doe@entrouvert.com',
first_name='John',
last_name='Doe',
attribute_values__content='9101112').count() == 1
# logout
app.session.flush()
@ -590,6 +623,14 @@ def test_detail_view_deleted(app, admin, simple_user):
app.get(url, status=404)
def test_user_table_mark_as_deleted(app, admin, user_ou1, ou1):
response = login(app, admin, '/manage/users/')
assert len(response.pyquery('table.main tbody tr')) == 2
user_ou1.mark_as_deleted()
response = app.get('/manage/users/')
assert len(response.pyquery('table.main tbody tr')) == 1
def test_user_import_row_error_display(transactional_db, app, admin):
User.objects.create(first_name='Elliott', last_name='1', ou=get_default_ou())
User.objects.create(first_name='Elliott', last_name='2', ou=get_default_ou())

View File

@ -72,7 +72,8 @@ def test_account_delete(app, simple_user, mailoutbox):
simple_user.refresh_from_db()
assert not simple_user.is_active
assert simple_user.deleted
assert simple_user.email != email
# no de-activation suffix on the address anymore:
assert simple_user.email == email
assert len(mailoutbox) == 2
assert 'Account deletion on testserver' == mailoutbox[1].subject
assert mailoutbox[0].to == [email]