authentic/tests/test_user_manager.py

319 lines
11 KiB
Python

# -*- coding: utf-8 -*-
# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import csv
import time
import pytest
from webtest import Upload
from django.core.urlresolvers import reverse
from django.contrib.contenttypes.models import ContentType
from django.utils.six import text_type
from django_rbac.utils import get_ou_model
from authentic2.custom_user.models import User
from authentic2.models import Attribute, AttributeValue
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.manager import user_import
from utils import login, get_link_from_mail, skipif_sqlite
OU = get_ou_model()
def visible_users(response):
return set(elt.text for elt in response.pyquery('td.username'))
def test_manager_user_change_email(app, superuser_or_admin, simple_user, mailoutbox):
ou = get_default_ou()
ou.validate_emails = True
ou.save()
NEW_EMAIL = 'john.doe@example.com'
assert NEW_EMAIL != simple_user.email
response = login(app, superuser_or_admin,
reverse('a2-manager-user-by-uuid-detail',
kwargs={'slug': text_type(simple_user.uuid)}))
assert 'Change user email' in response.text
# cannot click it's a submit button :/
response = app.get(reverse('a2-manager-user-by-uuid-change-email',
kwargs={'slug': text_type(simple_user.uuid)}))
assert response.form['new_email'].value == simple_user.email
response.form.set('new_email', NEW_EMAIL)
assert len(mailoutbox) == 0
response = response.form.submit().follow()
assert 'A mail was sent to john.doe@example.com to verify it.' in response.text
assert 'Change user email' in response.text
# cannot click it's a submit button :/
assert len(mailoutbox) == 1
assert simple_user.email in mailoutbox[0].body
assert NEW_EMAIL in mailoutbox[0].body
# logout
app.session.flush()
link = get_link_from_mail(mailoutbox[0])
response = app.get(link).maybe_follow()
assert (
'your request for changing your email for john.doe@example.com is successful'
in response.text)
simple_user.refresh_from_db()
assert simple_user.email == NEW_EMAIL
def test_manager_user_change_email_no_change(app, superuser_or_admin, simple_user, mailoutbox):
ou = get_default_ou()
ou.validate_emails = True
ou.save()
NEW_EMAIL = 'john.doe@example.com'
assert NEW_EMAIL != simple_user.email
response = login(app, superuser_or_admin,
reverse('a2-manager-user-by-uuid-detail',
kwargs={'slug': text_type(simple_user.uuid)}))
assert 'Change user email' in response.text
# cannot click it's a submit button :/
response = app.get(reverse('a2-manager-user-by-uuid-change-email',
kwargs={'slug': text_type(simple_user.uuid)}))
assert response.form['new_email'].value == simple_user.email
assert len(mailoutbox) == 0
response = response.form.submit().follow()
assert 'A mail was sent to john.doe@example.com to verify it.' not in response.text
def test_search_by_attribute(app, simple_user, admin):
Attribute.objects.create(name='adresse', searchable=True, kind='string')
simple_user.attributes.adresse = 'avenue du revestel'
response = login(app, admin, '/manage/users/')
# all users are visible
assert visible_users(response) == {simple_user.username, admin.username}
response.form['search-text'] = 'impasse'
response = response.form.submit()
# now all users are hidden
assert not visible_users(response) & {simple_user.username, admin.username}
response.form['search-text'] = 'avenue'
response = response.form.submit()
# now we see only simple_user
assert visible_users(response) == {simple_user.username}
@skipif_sqlite
def test_export_csv(settings, app, superuser, django_assert_num_queries):
AT_COUNT = 30
USER_COUNT = 2000
DEFAULT_BATCH_SIZE = 1000
ats = [Attribute(name='at%s' % i, label='At%s' % i, kind='string') for i in range(AT_COUNT)]
Attribute.objects.bulk_create(ats)
ats = list(Attribute.objects.all())
users = [User(username='user%s' % i) for i in range(USER_COUNT)]
User.objects.bulk_create(users)
users = list(User.objects.filter(username__startswith='user'))
user_ct = ContentType.objects.get_for_model(User)
atvs = []
for i in range(USER_COUNT):
atvs.extend([AttributeValue(
owner=users[i], attribute=ats[j], content='value-%s-%s' % (i, j)) for j in range(AT_COUNT)])
AttributeValue.objects.bulk_create(atvs)
response = login(app, superuser, reverse('a2-manager-users'))
settings.A2_CACHE_ENABLED = True
user_count = User.objects.count()
# queries should be batched to keep prefetching working without
# overspending memory for the queryset cache, 4 queries by batches
num_queries = 4 + 4 * (user_count / DEFAULT_BATCH_SIZE + bool(user_count % DEFAULT_BATCH_SIZE))
with django_assert_num_queries(num_queries):
response = response.click('CSV')
table = list(csv.reader(response.text.splitlines()))
assert len(table) == (user_count + 1)
assert len(table[0]) == (15 + AT_COUNT)
@skipif_sqlite
def test_export_csv_disabled_attribute(settings, app, superuser):
attr = Attribute.objects.create(name='attr', label='Attr', kind='string')
attr_d = Attribute.objects.create(name='attrd', label='Attrd', kind='string')
user = User.objects.create(username='user-foo')
AttributeValue.objects.create(owner=user, attribute=attr, content='attr-value')
AttributeValue.objects.create(owner=user, attribute=attr_d, content='attrd-value')
attr_d.disabled = True
attr_d.save()
response = login(app, superuser, reverse('a2-manager-users'))
settings.A2_CACHE_ENABLED = True
response = response.click('CSV')
user_count = User.objects.count()
table = list(csv.reader(response.content.splitlines()))
assert len(table) == (user_count + 1)
num_col = 15 + 1 # 1 is the number active attributes,
# disabled attribute should not show up
for line in table:
assert len(line) == num_col
def test_user_table(app, admin, user_ou1, ou1):
from authentic2.manager.utils import has_show_username
# base state, username are shown
response = login(app, admin, '/manage/users/')
assert response.pyquery('td.username')
# hide all usernames, from specific and general view
OU.objects.update(show_username=False)
has_show_username.cache.clear()
response = app.get('/manage/users/')
assert not response.pyquery('td.username')
response = app.get('/manage/users/?search-ou=%s' % get_default_ou().id)
assert not response.pyquery('td.username')
response = app.get('/manage/users/?search-ou=%s' % ou1.id)
assert not response.pyquery('td.username')
# hide username except in OU1
ou1.show_username = True
ou1.save()
has_show_username.cache.clear()
response = app.get('/manage/users/')
assert not response.pyquery('td.username')
response = app.get('/manage/users/?search-ou=%s' % get_default_ou().id)
assert not response.pyquery('td.username')
response = app.get('/manage/users/?search-ou=%s' % ou1.id)
assert response.pyquery('td.username')
@skipif_sqlite
@pytest.mark.parametrize('encoding', ['utf-8', 'cp1252', 'iso-8859-15'])
def test_user_import(encoding, transactional_db, app, admin, ou1, admin_ou1, media):
Attribute.objects.create(name='phone', kind='phone_number', label='Numéro de téléphone')
user_count = User.objects.count()
assert Attribute.objects.count() == 3
response = login(app, admin, '/manage/users/')
response = response.click('Import users')
response.form.set('import_file',
Upload(
'users.csv',
u'''email key verified,first_name,last_name,phone
tnoel@entrouvert.com,Thomas,Noël,1234
fpeters@entrouvert.com,Frédéric,Péters,5678
x,x,x,x'''.encode(encoding),
'application/octet-stream'))
response.form.set('encoding', encoding)
response.form.set('ou', str(get_default_ou().pk))
response = response.form.submit()
imports = list(user_import.UserImport.all())
assert len(imports) == 1
_import_uuid = response.location.split('/')[-2]
_import = user_import.UserImport(uuid=_import_uuid)
assert _import.exists()
response = response.follow()
response = response.forms['action-form'].submit(name='modify').follow()
response = response.forms['action-form'].submit(name='simulate')
reports = list(_import.reports)
assert len(reports) == 1
uuid = reports[0].uuid
response = response.follow()
def assert_timeout(duration, wait_function):
start = time.time()
while True:
result = wait_function()
if result is not None:
return result
assert time.time() - start < duration, '%s timed out after %s seconds' % (wait_function, duration)
time.sleep(0.001)
def wait_finished():
new_resp = response.click('Users Import')
if new_resp.pyquery('tr[data-uuid="%s"] td.state span' % uuid).text() == 'finished':
return new_resp
simulate = reports[0]
assert simulate.simulate
response = assert_timeout(2, wait_finished)
response = response.click(href=simulate.uuid)
assert len(response.pyquery('table.main tbody tr')) == 3
assert len(response.pyquery('table.main tbody tr.row-valid')) == 2
assert len(response.pyquery('table.main tbody tr.row-invalid')) == 1
assert User.objects.count() == user_count
response = response.click('Users Import')
response = response.forms['action-form'].submit(name='execute')
execute = list(report for report in _import.reports if not report.simulate)[0]
uuid = execute.uuid
response = response.follow()
response = assert_timeout(2, wait_finished)
assert User.objects.count() == user_count + 2
assert User.objects.filter(
email='tnoel@entrouvert.com',
first_name=u'Thomas',
last_name=u'Noël',
attribute_values__content='1234').count() == 1
assert User.objects.filter(
email='fpeters@entrouvert.com',
first_name=u'Frédéric',
last_name=u'Péters',
attribute_values__content='5678').count() == 1
# logout
app.session.flush()
response = login(app, admin_ou1, '/manage/users/')
app.get('/manage/users/import/', status=403)
app.get('/manage/users/import/%s/' % _import.uuid, status=403)
app.get('/manage/users/import/%s/%s/' % (_import.uuid, simulate.uuid), status=403)
app.get('/manage/users/import/%s/%s/' % (_import.uuid, execute.uuid), status=403)