authentic/tests/test_user_manager.py

438 lines
17 KiB
Python

# -*- coding: utf-8 -*-
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import csv
import re
import time
import pytest
from webtest import Upload
from django.core.urlresolvers import reverse
from django.contrib.contenttypes.models import ContentType
from django.utils.six import text_type
from django_rbac.utils import get_ou_model
from authentic2.custom_user.models import User
from authentic2.models import Attribute, AttributeValue, DeletedUser
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.manager import user_import
from utils import login, get_link_from_mail, skipif_sqlite
OU = get_ou_model()
def visible_users(response):
return set(elt.text for elt in response.pyquery('td.username'))
def test_manager_user_change_email(app, superuser_or_admin, simple_user, mailoutbox):
ou = get_default_ou()
ou.validate_emails = True
ou.save()
NEW_EMAIL = 'john.doe@example.com'
assert NEW_EMAIL != simple_user.email
response = login(app, superuser_or_admin,
reverse('a2-manager-user-by-uuid-detail',
kwargs={'slug': text_type(simple_user.uuid)}))
assert 'Change user email' in response.text
# cannot click it's a submit button :/
response = app.get(reverse('a2-manager-user-by-uuid-change-email',
kwargs={'slug': text_type(simple_user.uuid)}))
assert response.form['new_email'].value == simple_user.email
response.form.set('new_email', NEW_EMAIL)
assert len(mailoutbox) == 0
response = response.form.submit().follow()
assert 'A mail was sent to john.doe@example.com to verify it.' in response.text
assert 'Change user email' in response.text
# cannot click it's a submit button :/
assert len(mailoutbox) == 1
assert simple_user.email in mailoutbox[0].body
assert NEW_EMAIL in mailoutbox[0].body
# logout
app.session.flush()
link = get_link_from_mail(mailoutbox[0])
response = app.get(link).maybe_follow()
assert (
'your request for changing your email for john.doe@example.com is successful'
in response.text)
simple_user.refresh_from_db()
assert simple_user.email == NEW_EMAIL
def test_manager_user_change_email_no_change(app, superuser_or_admin, simple_user, mailoutbox):
ou = get_default_ou()
ou.validate_emails = True
ou.save()
NEW_EMAIL = 'john.doe@example.com'
assert NEW_EMAIL != simple_user.email
response = login(app, superuser_or_admin,
reverse('a2-manager-user-by-uuid-detail',
kwargs={'slug': text_type(simple_user.uuid)}))
assert 'Change user email' in response.text
# cannot click it's a submit button :/
response = app.get(reverse('a2-manager-user-by-uuid-change-email',
kwargs={'slug': text_type(simple_user.uuid)}))
assert response.form['new_email'].value == simple_user.email
assert len(mailoutbox) == 0
response = response.form.submit().follow()
assert 'A mail was sent to john.doe@example.com to verify it.' not in response.text
def test_search_by_attribute(app, simple_user, admin):
Attribute.objects.create(name='adresse', searchable=True, kind='string')
simple_user.attributes.adresse = 'avenue du revestel'
response = login(app, admin, '/manage/users/')
# all users are visible
assert visible_users(response) == {simple_user.username, admin.username}
response.form['search-text'] = 'impasse'
response = response.form.submit()
# now all users are hidden
assert not visible_users(response) & {simple_user.username, admin.username}
response.form['search-text'] = 'avenue'
response = response.form.submit()
# now we see only simple_user
assert visible_users(response) == {simple_user.username}
@skipif_sqlite
def test_export_csv(settings, app, superuser, django_assert_num_queries):
AT_COUNT = 30
USER_COUNT = 2000
DEFAULT_BATCH_SIZE = 1000
ats = [Attribute(name='at%s' % i, label='At%s' % i, kind='string') for i in range(AT_COUNT)]
Attribute.objects.bulk_create(ats)
ats = list(Attribute.objects.all())
users = [User(username='user%s' % i) for i in range(USER_COUNT)]
User.objects.bulk_create(users)
users = list(User.objects.filter(username__startswith='user'))
user_ct = ContentType.objects.get_for_model(User)
atvs = []
for i in range(USER_COUNT):
atvs.extend([AttributeValue(
owner=users[i], attribute=ats[j], content='value-%s-%s' % (i, j)) for j in range(AT_COUNT)])
AttributeValue.objects.bulk_create(atvs)
response = login(app, superuser, reverse('a2-manager-users'))
settings.A2_CACHE_ENABLED = True
user_count = User.objects.count()
# queries should be batched to keep prefetching working without
# overspending memory for the queryset cache, 4 queries by batches
num_queries = 4 + 4 * (user_count / DEFAULT_BATCH_SIZE + bool(user_count % DEFAULT_BATCH_SIZE))
with django_assert_num_queries(num_queries):
response = response.click('CSV')
table = list(csv.reader(response.text.splitlines()))
assert len(table) == (user_count + 1)
assert len(table[0]) == (15 + AT_COUNT)
@skipif_sqlite
def test_export_csv_disabled_attribute(settings, app, superuser):
attr = Attribute.objects.create(name='attr', label='Attr', kind='string')
attr_d = Attribute.objects.create(name='attrd', label='Attrd', kind='string')
user = User.objects.create(username='user-foo')
AttributeValue.objects.create(owner=user, attribute=attr, content='attr-value')
AttributeValue.objects.create(owner=user, attribute=attr_d, content='attrd-value')
attr_d.disabled = True
attr_d.save()
response = login(app, superuser, reverse('a2-manager-users'))
settings.A2_CACHE_ENABLED = True
response = response.click('CSV')
user_count = User.objects.count()
table = list(csv.reader(response.content.splitlines()))
assert len(table) == (user_count + 1)
num_col = 15 + 1 # 1 is the number active attributes,
# disabled attribute should not show up
for line in table:
assert len(line) == num_col
def test_user_table(app, admin, user_ou1, ou1):
from authentic2.manager.utils import has_show_username
# base state, username are shown
response = login(app, admin, '/manage/users/')
assert response.pyquery('td.username')
# hide all usernames, from specific and general view
OU.objects.update(show_username=False)
has_show_username.cache.clear()
response = app.get('/manage/users/')
assert not response.pyquery('td.username')
response = app.get('/manage/users/?search-ou=%s' % get_default_ou().id)
assert not response.pyquery('td.username')
response = app.get('/manage/users/?search-ou=%s' % ou1.id)
assert not response.pyquery('td.username')
# hide username except in OU1
ou1.show_username = True
ou1.save()
has_show_username.cache.clear()
response = app.get('/manage/users/')
assert not response.pyquery('td.username')
response = app.get('/manage/users/?search-ou=%s' % get_default_ou().id)
assert not response.pyquery('td.username')
response = app.get('/manage/users/?search-ou=%s' % ou1.id)
assert response.pyquery('td.username')
@skipif_sqlite
@pytest.mark.parametrize('encoding', ['utf-8', 'cp1252', 'iso-8859-15'])
def test_user_import(encoding, transactional_db, app, admin, ou1, admin_ou1, media):
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
user_count = User.objects.count()
assert Attribute.objects.count() == 3
response = login(app, admin, '/manage/users/')
response = response.click('Import users')
response.form.set('import_file',
Upload(
'users.csv',
u'''email key verified,first_name,last_name,phone
tnoel@entrouvert.com,Thomas,Noël,1234
fpeters@entrouvert.com,Frédéric,Péters,5678
x,x,x,x'''.encode(encoding),
'application/octet-stream'))
response.form.set('encoding', encoding)
response.form.set('ou', str(get_default_ou().pk))
response = response.form.submit()
imports = list(user_import.UserImport.all())
assert len(imports) == 1
_import_uuid = response.location.split('/')[-2]
_import = user_import.UserImport(uuid=_import_uuid)
assert _import.exists()
response = response.follow()
response = response.forms['action-form'].submit(name='modify').follow()
response = response.forms['action-form'].submit(name='simulate')
reports = list(_import.reports)
assert len(reports) == 1
uuid = reports[0].uuid
response = response.follow()
def assert_timeout(duration, wait_function):
start = time.time()
while True:
result = wait_function()
if result is not None:
return result
assert time.time() - start < duration, '%s timed out after %s seconds' % (wait_function, duration)
time.sleep(0.001)
def wait_finished():
new_resp = response.click('Users Import')
if new_resp.pyquery('tr[data-uuid="%s"] td.state' % uuid).text() == 'Finished':
return new_resp
simulate = reports[0]
assert simulate.simulate
response = assert_timeout(2, wait_finished)
response = response.click(href=simulate.uuid)
assert len(response.pyquery('table.main tbody tr')) == 3
assert len(response.pyquery('table.main tbody tr.row-valid')) == 2
assert len(response.pyquery('table.main tbody tr.row-invalid')) == 1
assert User.objects.count() == user_count
response = response.click('Users Import')
response = response.forms['action-form'].submit(name='execute')
execute = list(report for report in _import.reports if not report.simulate)[0]
uuid = execute.uuid
response = response.follow()
response = assert_timeout(2, wait_finished)
assert User.objects.count() == user_count + 2
assert User.objects.filter(
email='tnoel@entrouvert.com',
first_name=u'Thomas',
last_name=u'Noël',
attribute_values__content='1234').count() == 1
assert User.objects.filter(
email='fpeters@entrouvert.com',
first_name=u'Frédéric',
last_name=u'Péters',
attribute_values__content='5678').count() == 1
# logout
app.session.flush()
response = login(app, admin_ou1, '/manage/users/')
app.get('/manage/users/import/', status=403)
app.get('/manage/users/import/%s/' % _import.uuid, status=403)
app.get('/manage/users/import/%s/%s/' % (_import.uuid, simulate.uuid), status=403)
app.get('/manage/users/import/%s/%s/' % (_import.uuid, execute.uuid), status=403)
def test_su_permission(app, admin, simple_user):
resp = login(app, admin, '/manage/users/%s/' % simple_user.pk)
assert len(resp.pyquery('button[name="su"]')) == 0
assert app.get('/manage/users/%s/su/' % simple_user.pk, status=403)
def test_su_superuser_post(app, app_factory, superuser, simple_user):
resp = login(app, superuser, '/manage/users/%s/' % simple_user.pk)
assert len(resp.pyquery('button[name="su"]')) == 1
su_resp = resp.form.submit(name='su')
new_app = app_factory()
new_app.get(su_resp.location).maybe_follow()
assert new_app.session['_auth_user_id'] == str(simple_user.pk)
def test_su_superuser_dialog(app, app_factory, superuser, simple_user):
resp = login(app, superuser, '/manage/users/%s/' % simple_user.pk)
assert len(resp.pyquery('button[name="su"]')) == 1
su_view_url = resp.pyquery('button[name="su"]')[0].get('data-url')
resp = app.get(su_view_url)
anchors = resp.pyquery('a#su-link')
assert len(anchors) == 1
su_url = anchors[0].get('href')
new_app = app_factory()
new_app.get(su_url).maybe_follow()
assert new_app.session['_auth_user_id'] == str(simple_user.pk)
@skipif_sqlite
def test_user_import_attributes(transactional_db, app, admin, media):
Attribute.objects.create(name='more', kind='string', label='Signe particulier')
Attribute.objects.create(name='title', kind='title', label='Titre')
Attribute.objects.create(name='bike', kind='boolean', label='Vélo')
Attribute.objects.create(name='saintsday', kind='date', label='Fête')
Attribute.objects.create(name='birthdate', kind='birthdate', label='Date de naissance')
Attribute.objects.create(name='zip', kind='fr_postcode', label='Code postal (français)')
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
assert Attribute.objects.count() == 9
user_count = User.objects.count()
login(app, admin, '/manage/users/')
def import_csv(csv_content):
response = app.get('/manage/users/')
response = response.click('Import users')
index = [i for i in response.forms if 'import_file' in response.forms[i].fields][0]
response.forms[index].set(
'import_file',
Upload('users.csv', csv_content.encode('utf-8'), 'application/octet-stream'))
response.forms[index].set('encoding', 'utf-8')
response.forms[index].set('ou', str(get_default_ou().pk))
response = response.forms[index].submit().follow()
response = response.forms['action-form'].submit(name='execute').follow()
start = time.time()
response = response.click('Users Import')
while 'Running' in response.text:
response = response.click('Users Import')
assert time.time() - start < 2
time.sleep(.1)
# report
urls = re.findall('<a href="(/manage/users/import/[^/]+/[^/]+/)">', response.content)
response = app.get(urls[0])
return response
csv_lines = [
u"email key verified,first_name,last_name,more,title,bike,saintsday,birthdate,zip,phone",
u"elliot@universalpictures.com,Elliott,Thomas,petit,Mr,True,2019-7-20,1972-05-26,75014,1234",
u"et@universalpictures.com,ET,the Extra-Terrestrial,long,??,False,1/2/3/4,0002-2-22,42,home"]
response = import_csv('\n'.join(csv_lines))
urls = re.findall('<a href="(/manage/users/import/[^/]+/[^/]+/)">', response.content)
response = app.get(urls[0])
assert 'Select a valid choice. ?? is not one of the available choices.' in response.content
assert 'Enter a valid date.' in response.content
assert 'birthdate must be in the past and greater or equal than 1900-01-01.' in response.content
assert 'The value must be a valid french postcode' in response.content
assert 'Phone number can start with a + an must contain only digits' in response.content
assert User.objects.count() == user_count + 1
elliot = User.objects.filter(email='elliot@universalpictures.com')[0]
assert elliot.attributes.values['more'].content == 'petit'
assert elliot.attributes.values['title'].content == 'Mr'
assert elliot.attributes.values['bike'].content == '1'
assert elliot.attributes.values['saintsday'].content == '2019-07-20'
assert elliot.attributes.values['birthdate'].content == '1972-05-26'
assert elliot.attributes.values['zip'].content == '75014'
assert elliot.attributes.values['phone'].content == '1234'
csv_lines[2] = \
u"et@universalpictures.com,ET,the Extra-Terrestrial,,,,,,42000,+888 5678"
response = import_csv('\n'.join(csv_lines))
assert '0 rows have errors' in response.content
assert User.objects.count() == user_count + 2
et = User.objects.filter(email='et@universalpictures.com')[0]
assert et.attributes.values['more'].content == ''
assert et.attributes.values['title'].content == ''
assert et.attributes.values['bike'].content == '0'
assert 'saintsday' not in et.attributes.values
assert 'birthdate' not in et.attributes.values
assert et.attributes.values['zip'].content == '42000'
assert et.attributes.values['phone'].content == '+8885678'
def test_detail_view(app, admin, simple_user):
url = '/manage/users/{user.id}/'.format(user=simple_user)
response = login(app, admin, url)
assert not response.pyquery('.a2-manager-user-deletion')
DeletedUser.objects.create(user=simple_user)
response = app.get(url)
assert response.pyquery('.a2-manager-user-deletion')