authentic/tests/test_commands.py

478 lines
17 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import importlib
import json
from io import BufferedReader, BufferedWriter, TextIOWrapper
import py
import pytest
import webtest
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.utils.timezone import now
from jwcrypto.jwk import JWK, JWKSet
from authentic2.a2_rbac.models import (
ADMIN_OP,
MANAGE_MEMBERS_OP,
VIEW_OP,
Operation,
OrganizationalUnit,
Permission,
Role,
)
from authentic2.a2_rbac.utils import get_default_ou, get_operation
from authentic2.apps.journal.models import Event
from authentic2.custom_user.models import DeletedUser
from authentic2.models import UserExternalId
from authentic2_auth_oidc.models import OIDCAccount, OIDCProvider
from .utils import call_command, login
User = get_user_model()
def test_changepassword(db, simple_user, monkeypatch):
import getpass
def _getpass(*args, **kwargs):
return 'pass'
monkeypatch.setattr(getpass, 'getpass', _getpass)
call_command('changepassword', 'user')
old_pass = simple_user.password
simple_user.refresh_from_db()
assert old_pass != simple_user.password
def test_clean_unused_account(db, simple_user, mailoutbox, freezer, settings):
settings.LDAP_AUTH_SETTINGS = [{'realm': 'ldap', 'url': 'ldap://ldap.com/', 'basedn': 'dc=ldap,dc=com'}]
ldap_user = User.objects.create(username='ldap-user', email='ldap-user@example.com', ou=simple_user.ou)
oidc_user = User.objects.create(username='oidc-user', email='oidc-user@example.com', ou=simple_user.ou)
UserExternalId.objects.create(user=ldap_user, source='ldap', external_id='whatever')
provider = OIDCProvider.objects.create(name='oidc', ou=simple_user.ou)
OIDCAccount.objects.create(user=oidc_user, provider=provider, sub='1')
email = simple_user.email
freezer.move_to('2018-01-01')
simple_user.ou.clean_unused_accounts_alert = 2
simple_user.ou.clean_unused_accounts_deletion = 3
simple_user.ou.save()
last_login = now() - datetime.timedelta(days=2, seconds=30)
for user in (simple_user, ldap_user, oidc_user):
user.last_login = last_login
user.save()
call_command('clean-unused-accounts')
assert User.objects.count() == 3
assert len(mailoutbox) == 1
assert (
Event.objects.filter(
type__name='user.notification.inactivity', user=simple_user, data__email=simple_user.email
).count()
== 1
)
freezer.move_to('2018-01-01 12:00:00')
# no new mail, no deletion
call_command('clean-unused-accounts')
assert User.objects.count() == 3
assert len(mailoutbox) == 1
freezer.move_to('2018-01-02')
call_command('clean-unused-accounts')
assert User.objects.count() == 2
deleted_user = DeletedUser.objects.get()
assert deleted_user.old_user_id == simple_user.id
assert len(mailoutbox) == 2
assert mailoutbox[-1].to == [email]
assert (
Event.objects.filter(
type__name='user.deletion.inactivity', user=simple_user, data__email=simple_user.email
).count()
== 1
)
def test_clean_unused_account_user_logs_in(app, db, simple_user, mailoutbox, freezer):
freezer.move_to('2018-01-01')
simple_user.ou.clean_unused_accounts_alert = 2
simple_user.ou.clean_unused_accounts_deletion = 3
simple_user.ou.save()
simple_user.last_login = now() - datetime.timedelta(days=2)
simple_user.save()
call_command('clean-unused-accounts')
assert len(mailoutbox) == 1
login(app, simple_user)
# the day of deletion, nothing happens
freezer.move_to('2018-01-02')
simple_user.refresh_from_db()
assert len(mailoutbox) == 1
# when new alert delay is reached, user gets alerted again
freezer.move_to('2018-01-04')
call_command('clean-unused-accounts')
simple_user.refresh_from_db()
assert len(mailoutbox) == 2
def test_clean_unused_account_disabled_by_default(db, simple_user, mailoutbox):
simple_user.last_login = now() - datetime.timedelta(days=2)
simple_user.save()
call_command('clean-unused-accounts')
simple_user.refresh_from_db()
assert len(mailoutbox) == 0
def test_clean_unused_account_a2_user_exclude(app, db, simple_user, mailoutbox, freezer, settings):
settings.A2_USER_EXCLUDE = {'username': simple_user.username}
freezer.move_to('2018-01-01')
simple_user.ou.clean_unused_accounts_alert = 2
simple_user.ou.clean_unused_accounts_deletion = 3
simple_user.ou.save()
simple_user.last_login = now() - datetime.timedelta(days=2)
simple_user.save()
call_command('clean-unused-accounts')
assert len(mailoutbox) == 0
def test_clean_unused_account_always_alert(db, simple_user, mailoutbox, freezer):
simple_user.ou.clean_unused_accounts_alert = 2
simple_user.ou.clean_unused_accounts_deletion = 3 # one day between alert and actual deletion
simple_user.ou.save()
simple_user.last_login = now() - datetime.timedelta(days=4)
simple_user.save()
# even if account last login in past deletion delay, an alert is always sent first
call_command('clean-unused-accounts')
simple_user.refresh_from_db()
assert len(mailoutbox) == 1
# and calling again as no effect, since one day must pass before account is deleted
call_command('clean-unused-accounts')
simple_user.refresh_from_db()
assert len(mailoutbox) == 1
@pytest.mark.parametrize(
"deletion_delay,formatted", [(730, '2\xa0years'), (500, '1\xa0year'), (65, '2\xa0months')]
)
def test_clean_unused_account_human_duration_format(simple_user, mailoutbox, deletion_delay, formatted):
simple_user.ou.clean_unused_accounts_alert = deletion_delay - 1
simple_user.ou.clean_unused_accounts_deletion = deletion_delay
simple_user.ou.save()
simple_user.last_login = now() - datetime.timedelta(days=deletion_delay + 1)
simple_user.save()
# alert email
call_command('clean-unused-accounts')
mail = mailoutbox[0]
assert formatted in mail.body
assert formatted in mail.subject and not '\n' in mail.subject
# deletion email
simple_user.last_account_deletion_alert = now() - datetime.timedelta(days=2)
simple_user.save()
call_command('clean-unused-accounts')
mail = mailoutbox[1]
assert formatted in mail.body
def test_clean_unused_account_login_url(simple_user, mailoutbox):
simple_user.ou.clean_unused_accounts_alert = 1
simple_user.ou.clean_unused_accounts_deletion = 2
simple_user.ou.save()
simple_user.last_login = now() - datetime.timedelta(days=1)
simple_user.save()
call_command('clean-unused-accounts')
mail = mailoutbox[0]
assert 'href="http://testserver/login/"' in mail.message().as_string()
def test_clean_unused_account_with_no_email(simple_user, mailoutbox, caplog):
simple_user.email = ''
simple_user.ou.clean_unused_accounts_alert = 1
simple_user.ou.clean_unused_accounts_deletion = 2
simple_user.ou.save()
simple_user.last_login = now() - datetime.timedelta(days=1)
simple_user.save()
call_command('clean-unused-accounts')
assert len(mailoutbox) == 0
assert 'clean-unused-accounts failed' not in caplog.text
def test_cleanupauthentic(db):
call_command('cleanupauthentic')
def test_load_ldif(db, monkeypatch, tmpdir):
FileType = (TextIOWrapper, BufferedReader, BufferedWriter)
ldif = tmpdir.join('some.ldif')
ldif.ensure()
class MockPArser:
def __init__(self, *args, **kwargs):
self.users = []
assert len(args) == 1
assert isinstance(args[0], FileType)
assert kwargs['options']['extra_attribute'] == {'ldap_attr': 'first_name'}
assert kwargs['options']['result'] == 'result'
def parse(self):
pass
oidc_cmd = importlib.import_module('authentic2.management.commands.load-ldif')
monkeypatch.setattr(oidc_cmd, 'DjangoUserLDIFParser', MockPArser)
call_command('load-ldif', ldif.strpath, result='result', extra_attribute={'ldap_attr': 'first_name'})
# test ExtraAttributeAction
class MockPArser: # pylint: disable=E0102
def __init__(self, *args, **kwargs):
self.users = []
assert len(args) == 1
assert isinstance(args[0], FileType)
assert kwargs['options']['extra_attribute'] == {'ldap_attr': 'first_name'}
assert kwargs['options']['result'] == 'result'
def parse(self):
pass
monkeypatch.setattr(oidc_cmd, 'DjangoUserLDIFParser', MockPArser)
call_command(
'load-ldif', '--extra-attribute', 'ldap_attr', 'first_name', '--result', 'result', ldif.strpath
)
def test_oidc_register_issuer(db, tmpdir, monkeypatch):
oidc_conf_f = py.path.local(__file__).dirpath('openid_configuration.json')
with oidc_conf_f.open() as f:
oidc_conf = json.load(f)
def register_issuer(
name,
client_id,
client_secret,
issuer=None,
openid_configuration=None,
verify=True,
timeout=None,
ou=None,
):
ou = OrganizationalUnit.objects.get(default=True)
# skipping jwkset retrieval in mocked function
jwkset = JWKSet()
jwkset.add(JWK.generate(kty='RSA', size=512, kid='auie'))
jwkset.add(JWK.generate(kty='EC', size=256, kid='tsrn'))
return OIDCProvider.objects.create(
name=name,
slug='test',
client_id='abc',
client_secret='def',
enabled=True,
ou=ou,
issuer=issuer,
strategy='create',
jwkset_json=jwkset.export(as_dict=True),
authorization_endpoint=openid_configuration['authorization_endpoint'],
token_endpoint=openid_configuration['token_endpoint'],
userinfo_endpoint=openid_configuration['userinfo_endpoint'],
end_session_endpoint=openid_configuration['end_session_endpoint'],
)
oidc_cmd = importlib.import_module('authentic2_auth_oidc.management.commands.oidc-register-issuer')
monkeypatch.setattr(oidc_cmd, 'register_issuer', register_issuer)
oidc_conf = py.path.local(__file__).dirpath('openid_configuration.json').strpath
call_command(
'oidc-register-issuer',
'--openid-configuration',
oidc_conf,
'--issuer',
'issuer',
'--client-id',
'auie',
'--client-secret',
'tsrn',
'somename',
)
provider = OIDCProvider.objects.get(name='somename')
assert provider.issuer == 'issuer'
def test_resetpassword(simple_user):
call_command('resetpassword', 'user')
old_pass = simple_user.password
simple_user.refresh_from_db()
assert old_pass != simple_user.password
def test_sync_metadata(db):
test_file = py.path.local(__file__).dirpath('metadata.xml').strpath
call_command('sync-metadata', test_file, source='abcd')
def test_check_and_repair_managers_of_roles(db, capsys):
default_ou = get_default_ou()
admin_op = get_operation(ADMIN_OP)
OrganizationalUnit.objects.create(name='Orgunit1', slug='orgunit1')
role1 = Role.objects.create(name='Role 1', slug='role-1', ou=default_ou)
perm1 = Permission.objects.create(
operation=admin_op,
target_id=role1.id,
ou=default_ou,
target_ct=ContentType.objects.get_for_model(Role),
)
manager_role1 = Role.objects.create(name='Managers of Role 1', slug='_a2-managers-of-role-role1')
manager_role1.permissions.add(perm1)
manager_role1.save()
call_command('check-and-repair', '--repair', '--noinput')
captured = capsys.readouterr()
assert '"Managers of Role 1": no admin scope' in captured.out
assert 'Managers of Role 1" wrong ou, should be "Default organizational unit"' in captured.out
assert 'invalid permission "Management / role / Role 1": not manage_members operation' in captured.out
assert (
'invalid permission "Management / role / Role 1": not admin_scope and not self manage permission'
in captured.out
)
assert (
'invalid admin role "Managers of Role 1" wrong ou, should be "Default organizational unit" is "None"'
in captured.out
)
perm1.refresh_from_db()
assert perm1.ou is None
manager_role1 = role1.get_admin_role()
assert manager_role1.ou == get_default_ou()
assert manager_role1.permissions.count() == 3
assert manager_role1.permissions.get(
operation=get_operation(MANAGE_MEMBERS_OP), target_id=manager_role1.id
)
assert manager_role1.permissions.get(operation=get_operation(MANAGE_MEMBERS_OP), target_id=role1.id)
assert manager_role1.permissions.get(
operation=get_operation(VIEW_OP),
target_ct=ContentType.objects.get_for_model(ContentType),
target_id=ContentType.objects.get_for_model(User).pk,
)
manage_members_op = get_operation(MANAGE_MEMBERS_OP)
perm1.op = manage_members_op
perm1.save()
call_command('check-and-repair', '--repair', '--noinput')
perm1 = Permission.objects.get(operation=manage_members_op, target_id=role1.id)
assert perm1.ou is None
def test_check_and_delete_unused_permissions(db, capsys, simple_user):
role1 = Role.objects.create(name='Role1', slug='role1')
op1 = Operation.objects.create(slug='operation-1')
used_perm = Permission.objects.create(
operation=op1, target_id=role1.id, target_ct=ContentType.objects.get_for_model(Role)
)
role1.admin_scope = used_perm
role1.save()
Permission.objects.create(
operation=op1, target_id=simple_user.id, target_ct=ContentType.objects.get_for_model(get_user_model())
)
call_command('check-and-repair', '--fake', '--noinput')
n_perm = len(Permission.objects.all())
call_command('check-and-repair', '--repair', '--noinput')
assert len(Permission.objects.all()) == n_perm - 1
def test_check_identifiers_uniqueness(db, capsys, settings):
settings.A2_USERNAME_IS_UNIQUE = False
ou = get_default_ou()
ou.email_is_unique = True
ou.save()
User.objects.create(username='foo', email='foo@example.net', first_name='Toto', last_name='Foo', ou=ou)
User.objects.create(username='foo', email='bar@example.net', first_name='Bar', last_name='Foo', ou=ou)
User.objects.create(username='bar', email='bar@example.net', first_name='Tutu', last_name='Bar', ou=ou)
settings.A2_EMAIL_IS_UNIQUE = True
settings.A2_USERNAME_IS_UNIQUE = True
call_command('check-and-repair', '--repair', '--noinput')
captured = capsys.readouterr()
assert 'found 2 user accounts with same username' in captured.out
assert 'found 2 user accounts with same email' in captured.out
def test_clean_unused_account_max_mails_per_period(settings, db, mailoutbox, freezer):
ou = get_default_ou()
ou.clean_unused_accounts_alert = 1
ou.clean_unused_accounts_deletion = 2
ou.save()
settings.A2_CLEAN_UNUSED_ACCOUNTS_MAX_MAIL_PER_PERIOD = 4
for i in range(100):
User.objects.create(ou=ou, email='user-%s@example.com' % i, last_login=now())
call_command('clean-unused-accounts')
assert len(mailoutbox) == 0
freezer.move_to(datetime.timedelta(days=1))
call_command('clean-unused-accounts')
# 4 alerts
assert len(mailoutbox) == 4
freezer.move_to(datetime.timedelta(days=1))
call_command('clean-unused-accounts')
# 4 new alerts and 4 deletions notifications
assert len(mailoutbox) == 4 + 8
def test_clean_user_exports(settings, app, superuser, freezer):
users = [User(username='user%s' % i) for i in range(10)]
User.objects.bulk_create(users)
# export directory does not exist yet
call_command('clean-user-exports')
resp = login(app, superuser, '/manage/users/')
resp = resp.click('CSV').follow()
file_creation_time = now()
assert resp.click('Download CSV')
freezer.move_to(file_creation_time + datetime.timedelta(days=5))
call_command('clean-user-exports')
assert resp.click('Download CSV')
freezer.move_to(file_creation_time + datetime.timedelta(days=8))
call_command('clean-user-exports')
with pytest.raises(webtest.app.AppError):
resp.click('Download CSV')