2203 lines
83 KiB
Python
2203 lines
83 KiB
Python
# -*- coding: utf-8 -*-
|
|
# authentic2 - versatile identity manager
|
|
# Copyright (C) 2010-2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
import datetime
|
|
import json
|
|
import mock
|
|
import pytest
|
|
import random
|
|
import uuid
|
|
|
|
|
|
import django
|
|
from django.contrib.auth.hashers import check_password
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core import mail
|
|
from django.urls import reverse
|
|
from django.utils.encoding import force_text
|
|
from django.utils.text import slugify
|
|
from django.utils.timezone import now
|
|
from django.utils.http import urlencode
|
|
|
|
from rest_framework import VERSION as drf_version
|
|
from django_rbac.models import SEARCH_OP
|
|
from django_rbac.utils import get_role_model, get_ou_model
|
|
from requests.models import Response
|
|
|
|
from authentic2.a2_rbac.models import Role
|
|
from authentic2.a2_rbac.utils import get_default_ou
|
|
from authentic2.apps.journal.models import EventType, Event
|
|
from authentic2.models import Service, Attribute, AttributeValue, AuthorizedRole
|
|
from authentic2.utils import good_next_url
|
|
|
|
from .utils import login, basic_authorization_header, get_link_from_mail
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
def test_api_user_simple(logged_app):
|
|
resp = logged_app.get('/api/user/')
|
|
assert isinstance(resp.json, dict)
|
|
assert 'username' in resp.json
|
|
|
|
|
|
def test_api_user(client):
|
|
# create an user, an ou role, a service and a service role
|
|
ou = get_default_ou()
|
|
|
|
Attribute.objects.create(kind='birthdate', name='birthdate', label='birthdate', required=True)
|
|
user = User.objects.create(ou=ou, username='john.doe', first_name=u'Jôhn',
|
|
last_name=u'Doe', email='john.doe@example.net')
|
|
user.attributes.birthdate = datetime.date(2019, 2, 2)
|
|
user.set_password('password')
|
|
user.save()
|
|
|
|
role1 = Role.objects.create(name='Role1', ou=ou)
|
|
role1.members.add(user)
|
|
|
|
service = Service.objects.create(name='Service1', slug='service1', ou=ou)
|
|
role2 = Role.objects.create(name='Role2', service=service)
|
|
role2.members.add(user)
|
|
|
|
Role.objects.create(name='Role3', ou=ou)
|
|
Role.objects.create(name='Role4', service=service)
|
|
|
|
# test failure when unlogged
|
|
response = client.get('/api/user/', HTTP_ORIGIN='http://testserver')
|
|
assert response.content == b'{}'
|
|
|
|
# login
|
|
client.login(request=None, username='john.doe', password='password')
|
|
response = client.get('/api/user/', HTTP_ORIGIN='http://testserver')
|
|
data = json.loads(force_text(response.content))
|
|
assert isinstance(data, dict)
|
|
assert set(data.keys()) == set(['uuid', 'username', 'first_name',
|
|
'ou__slug', 'ou__uuid', 'ou__name',
|
|
'last_name', 'email', 'roles', 'services',
|
|
'is_superuser', 'ou', 'birthdate'])
|
|
assert data['uuid'] == user.uuid
|
|
assert data['username'] == user.username
|
|
assert data['first_name'] == user.first_name
|
|
assert data['last_name'] == user.last_name
|
|
assert data['email'] == user.email
|
|
assert data['is_superuser'] == user.is_superuser
|
|
assert data['ou'] == ou.name
|
|
assert data['ou__name'] == ou.name
|
|
assert data['ou__slug'] == ou.slug
|
|
assert data['ou__uuid'] == ou.uuid
|
|
assert data['birthdate'] == '2019-02-02'
|
|
assert isinstance(data['roles'], list)
|
|
assert len(data['roles']) == 2
|
|
for role in data['roles']:
|
|
assert set(role.keys()) == set(['uuid', 'name', 'slug', 'is_admin',
|
|
'is_service', 'ou__uuid', 'ou__name',
|
|
'ou__slug'])
|
|
assert (role['uuid'] == role1.uuid and
|
|
role['name'] == role1.name and
|
|
role['slug'] == role1.slug and
|
|
role['is_admin'] is False and
|
|
role['is_service'] is False and
|
|
role['ou__uuid'] == ou.uuid and
|
|
role['ou__name'] == ou.name and
|
|
role['ou__slug'] == ou.slug) or \
|
|
(role['uuid'] == role2.uuid and
|
|
role['name'] == role2.name and
|
|
role['slug'] == role2.slug and
|
|
role['is_admin'] is False and
|
|
role['is_service'] is True and
|
|
role['ou__uuid'] == ou.uuid and
|
|
role['ou__name'] == ou.name and
|
|
role['ou__slug'] == ou.slug)
|
|
|
|
assert isinstance(data['services'], list)
|
|
assert len(data['services']) == 1
|
|
s = data['services'][0]
|
|
assert set(s.keys()) == set(['name', 'slug', 'ou', 'ou__name', 'ou__slug',
|
|
'ou__uuid', 'roles'])
|
|
assert s['name'] == service.name
|
|
assert s['slug'] == service.slug
|
|
assert s['ou'] == ou.name
|
|
assert s['ou__name'] == ou.name
|
|
assert s['ou__slug'] == ou.slug
|
|
assert s['ou__uuid'] == ou.uuid
|
|
assert isinstance(s['roles'], list)
|
|
assert len(s['roles']) == 2
|
|
for role in s['roles']:
|
|
assert set(role.keys()) == set(['uuid', 'name', 'slug', 'is_admin',
|
|
'is_service', 'ou__uuid', 'ou__name',
|
|
'ou__slug'])
|
|
assert (role['uuid'] == role1.uuid and
|
|
role['name'] == role1.name and
|
|
role['slug'] == role1.slug and
|
|
role['is_admin'] is False and
|
|
role['is_service'] is False and
|
|
role['ou__uuid'] == ou.uuid and
|
|
role['ou__name'] == ou.name and
|
|
role['ou__slug'] == ou.slug) or \
|
|
(role['uuid'] == role2.uuid and
|
|
role['name'] == role2.name and
|
|
role['slug'] == role2.slug and
|
|
role['is_admin'] is False and
|
|
role['is_service'] is True and
|
|
role['ou__uuid'] == ou.uuid and
|
|
role['ou__name'] == ou.name and
|
|
role['ou__slug'] == ou.slug)
|
|
|
|
|
|
def test_api_users_list(app, user):
|
|
app.authorization = ('Basic', (user.username, user.username))
|
|
resp = app.get('/api/users/')
|
|
assert isinstance(resp.json, dict)
|
|
assert set(['previous', 'next', 'results']) == set(resp.json.keys())
|
|
assert resp.json['previous'] is None
|
|
assert resp.json['next'] is None
|
|
|
|
|
|
def test_api_users_update_with_email_verified(settings, app, admin, simple_user):
|
|
from django.contrib.auth import get_user_model
|
|
simple_user.email_verified = True
|
|
simple_user.save()
|
|
|
|
payload = {
|
|
'username': simple_user.username,
|
|
'id': simple_user.id,
|
|
'email': 'john.doe@nowhere.null',
|
|
'first_name': 'Johnny',
|
|
'last_name': 'Doeny',
|
|
'email_verified': True,
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid),
|
|
params=payload, headers=headers, status=200)
|
|
user = User.objects.get(id=simple_user.id)
|
|
assert user.email_verified
|
|
assert resp.json['email_verified']
|
|
|
|
user.email_verified = True
|
|
user.email = 'johnny.doeny@foo.bar'
|
|
user.save()
|
|
|
|
resp = app.patch_json('/api/users/{}/'.format(simple_user.uuid),
|
|
params=payload, headers=headers, status=200)
|
|
user = User.objects.get(id=simple_user.id)
|
|
assert user.email_verified
|
|
assert resp.json['email_verified']
|
|
|
|
|
|
def test_api_users_update_without_email_verified(settings, app, admin, simple_user):
|
|
from django.contrib.auth import get_user_model
|
|
simple_user.email_verified = True
|
|
simple_user.save()
|
|
|
|
payload = {
|
|
'username': simple_user.username,
|
|
'id': simple_user.id,
|
|
'email': 'john.doe@nowhere.null',
|
|
'first_name': 'Johnny',
|
|
'last_name': 'Doeny',
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid),
|
|
params=payload, headers=headers, status=200)
|
|
user = User.objects.get(id=simple_user.id)
|
|
assert not user.email_verified
|
|
assert not resp.json['email_verified']
|
|
|
|
user.email_verified = True
|
|
user.email = 'johnny.doeny@foo.bar'
|
|
user.save()
|
|
|
|
resp = app.patch_json('/api/users/{}/'.format(simple_user.uuid),
|
|
params=payload, headers=headers, status=200)
|
|
user = User.objects.get(id=simple_user.id)
|
|
assert not user.email_verified
|
|
assert not resp.json['email_verified']
|
|
|
|
|
|
def test_api_users_update_with_same_unique_email(settings, app, admin, simple_user):
|
|
from django.contrib.auth import get_user_model
|
|
ou = get_default_ou()
|
|
ou.email_is_unique = True
|
|
ou.save()
|
|
|
|
payload = {
|
|
'username': simple_user.username,
|
|
'id': simple_user.id,
|
|
'email': 'john.doe@nowhere.null',
|
|
'first_name': 'Johnny',
|
|
'last_name': 'Doeny',
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid),
|
|
params=payload, headers=headers, status=200)
|
|
user = User.objects.get(id=simple_user.id)
|
|
|
|
resp = app.patch_json('/api/users/{}/'.format(simple_user.uuid),
|
|
params=payload, headers=headers, status=200)
|
|
|
|
|
|
def test_api_users_create_with_email_verified(settings, app, admin):
|
|
from django.contrib.auth import get_user_model
|
|
|
|
payload = {
|
|
'username': 'janedoe',
|
|
'email': 'jane.doe@nowhere.null',
|
|
'first_name': 'Jane',
|
|
'last_name': 'Doe',
|
|
'email_verified': True,
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
resp = app.post_json('/api/users/', headers=headers, params=payload,
|
|
status=201)
|
|
assert resp.json['email_verified']
|
|
user = User.objects.get(uuid=resp.json['uuid'])
|
|
assert user.email_verified
|
|
|
|
|
|
def test_api_users_create_without_email_verified(settings, app, admin):
|
|
from django.contrib.auth import get_user_model
|
|
|
|
payload = {
|
|
'username': 'janedoe',
|
|
'email': 'jane.doe@nowhere.null',
|
|
'first_name': 'Jane',
|
|
'last_name': 'Doe',
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
resp = app.post_json('/api/users/', headers=headers, params=payload,
|
|
status=201)
|
|
assert not resp.json['email_verified']
|
|
user = User.objects.get(uuid=resp.json['uuid'])
|
|
assert not user.email_verified
|
|
|
|
|
|
def test_api_email_unset_verification(settings, app, admin, simple_user):
|
|
from django.contrib.auth import get_user_model
|
|
simple_user.email_verified = True
|
|
simple_user.save()
|
|
|
|
payload = {
|
|
'email': 'john.doe@nowhere.null',
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
resp = app.post_json('/api/users/{}/email/'.format(simple_user.uuid),
|
|
params=payload, headers=headers, status=200)
|
|
user = User.objects.get(id=simple_user.id)
|
|
assert not user.email_verified
|
|
|
|
|
|
def test_api_users_boolean_attribute(app, superuser):
|
|
at = Attribute.objects.create(
|
|
kind='boolean', name='boolean', label='boolean', required=True)
|
|
superuser.attributes.boolean = True
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
resp = app.get('/api/users/%s/' % superuser.uuid)
|
|
assert resp.json['boolean'] is True
|
|
|
|
|
|
def test_api_users_boolean_attribute_optional(app, superuser):
|
|
at = Attribute.objects.create(
|
|
kind='boolean', name='boolean', label='boolean', required=False)
|
|
superuser.attributes.boolean = True
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
resp = app.get('/api/users/%s/' % superuser.uuid)
|
|
assert resp.json['boolean'] is True
|
|
|
|
|
|
def test_api_users_list_by_authorized_service(app, superuser):
|
|
from authentic2.models import Service
|
|
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
|
|
user1 = User.objects.create(username='user1')
|
|
user2 = User.objects.create(username='user2')
|
|
user3 = User.objects.create(username='user3')
|
|
|
|
role1 = Role.objects.create(name='role1')
|
|
role2 = Role.objects.create(name='role2')
|
|
role1.add_child(role2)
|
|
user1.roles.set([role1])
|
|
user2.roles.set([role2])
|
|
|
|
service1 = Service.objects.create(ou=get_default_ou(), name='service1', slug='service1')
|
|
service1.add_authorized_role(role1)
|
|
|
|
service2 = Service.objects.create(ou=get_default_ou(), name='service2', slug='service2')
|
|
|
|
resp = app.get('/api/users/')
|
|
assert len(resp.json['results']) == 4
|
|
|
|
resp = app.get('/api/users/?service-ou=default&service-slug=service1')
|
|
assert len(resp.json['results']) == 2
|
|
assert set(user['username'] for user in resp.json['results']) == set(['user1', 'user2'])
|
|
|
|
resp = app.get('/api/users/?service-ou=default&service-slug=service2')
|
|
assert len(resp.json['results']) == 4
|
|
|
|
|
|
def test_api_users_list_search_text(app, superuser):
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
User = get_user_model()
|
|
someuser = User.objects.create(username='someuser')
|
|
resp = app.get('/api/users/?q=some')
|
|
results = resp.json['results']
|
|
assert len(results) == 1
|
|
assert results[0]['username'] == 'someuser'
|
|
someuser.mark_as_deleted()
|
|
resp = app.get('/api/users/?q=some')
|
|
results = resp.json['results']
|
|
assert not len(results)
|
|
|
|
|
|
def test_api_users_create(settings, app, api_user):
|
|
from django.contrib.auth import get_user_model
|
|
|
|
at = Attribute.objects.create(kind='title', name='title', label='title')
|
|
app.authorization = ('Basic', (api_user.username, api_user.username))
|
|
# test missing first_name
|
|
payload = {
|
|
'username': 'john.doe',
|
|
'email': 'john.doe@example.net',
|
|
}
|
|
if api_user.roles.exists():
|
|
status = 400
|
|
payload['ou'] = api_user.ou.slug
|
|
resp = app.post_json('/api/users/', params=payload, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert set(['first_name', 'last_name']) == set(resp.json['errors'])
|
|
settings.A2_API_USERS_REQUIRED_FIELDS = ['email']
|
|
if api_user.is_superuser or hasattr(api_user, 'oidc_client') or api_user.roles.exists():
|
|
status = 201
|
|
else:
|
|
status = 403
|
|
resp = app.post_json('/api/users/', params=payload, status=status)
|
|
if status == 201:
|
|
assert resp.json
|
|
del settings.A2_API_USERS_REQUIRED_FIELDS
|
|
|
|
payload = {
|
|
'username': 'john.doe',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'email': 'john.doe@example.net',
|
|
'password': 'password',
|
|
'title': 'Mr',
|
|
}
|
|
if api_user.is_superuser:
|
|
status = 201
|
|
elif api_user.roles.exists():
|
|
status = 201
|
|
payload['ou'] = api_user.ou.slug
|
|
else:
|
|
status = 403
|
|
|
|
resp = app.post_json('/api/users/', params=payload, status=status)
|
|
if api_user.is_superuser or api_user.roles.exists():
|
|
assert set(['ou', 'id', 'uuid', 'is_staff', 'is_superuser',
|
|
'first_name', 'first_name_verified', 'last_name',
|
|
'last_name_verified', 'date_joined', 'last_login',
|
|
'username', 'password', 'email', 'is_active', 'title',
|
|
'title_verified', 'modified', 'email_verified',
|
|
'last_account_deletion_alert', 'deleted']) == set(resp.json.keys())
|
|
assert resp.json['first_name'] == payload['first_name']
|
|
assert resp.json['last_name'] == payload['last_name']
|
|
assert resp.json['email'] == payload['email']
|
|
assert resp.json['username'] == payload['username']
|
|
assert resp.json['title'] == payload['title']
|
|
assert resp.json['uuid']
|
|
assert resp.json['id']
|
|
assert resp.json['date_joined']
|
|
assert not resp.json['first_name_verified']
|
|
assert not resp.json['last_name_verified']
|
|
assert not resp.json['title_verified']
|
|
if api_user.is_superuser:
|
|
assert resp.json['ou'] == 'default'
|
|
elif api_user.roles.exists():
|
|
assert resp.json['ou'] == api_user.ou.slug
|
|
new_user = get_user_model().objects.get(id=resp.json['id'])
|
|
assert new_user.uuid == resp.json['uuid']
|
|
assert new_user.username == resp.json['username']
|
|
assert new_user.email == resp.json['email']
|
|
assert new_user.first_name == resp.json['first_name']
|
|
assert new_user.last_name == resp.json['last_name']
|
|
assert AttributeValue.objects.with_owner(new_user).count() == 3
|
|
assert AttributeValue.objects.with_owner(new_user).filter(verified=True).count() == 0
|
|
assert AttributeValue.objects.with_owner(new_user).filter(attribute=at).exists()
|
|
assert (AttributeValue.objects.with_owner(new_user).get(attribute=at).content ==
|
|
payload['title'])
|
|
resp2 = app.get('/api/users/%s/' % resp.json['uuid'])
|
|
assert resp.json == resp2.json
|
|
payload.update({'uuid': '1234567890', 'email': 'foo@example.com',
|
|
'username': 'foobar'})
|
|
resp = app.post_json('/api/users/', params=payload, status=status)
|
|
assert resp.json['uuid'] == '1234567890'
|
|
assert 'title' in resp.json
|
|
at.disabled = True
|
|
at.save()
|
|
resp = app.get('/api/users/1234567890/')
|
|
assert 'title' not in resp.json
|
|
|
|
at.disabled = False
|
|
at.save()
|
|
payload = {
|
|
'username': 'john.doe2',
|
|
'first_name': 'John',
|
|
'first_name_verified': True,
|
|
'last_name': 'Doe',
|
|
'last_name_verified': True,
|
|
'email': 'john.doe@example.net',
|
|
'password': 'password',
|
|
'title': 'Mr',
|
|
'title_verified': True,
|
|
}
|
|
if api_user.is_superuser:
|
|
status = 201
|
|
elif api_user.roles.exists():
|
|
status = 201
|
|
payload['ou'] = api_user.ou.slug
|
|
else:
|
|
status = 403
|
|
|
|
resp = app.post_json('/api/users/', params=payload, status=status)
|
|
if api_user.is_superuser or api_user.roles.exists():
|
|
assert set(['ou', 'id', 'uuid', 'is_staff', 'is_superuser',
|
|
'first_name', 'first_name_verified', 'last_name',
|
|
'last_name_verified', 'date_joined', 'last_login',
|
|
'username', 'password', 'email', 'is_active', 'title',
|
|
'title_verified', 'modified', 'email_verified',
|
|
'last_account_deletion_alert', 'deleted']) == set(resp.json.keys())
|
|
user = get_user_model().objects.get(pk=resp.json['id'])
|
|
assert AttributeValue.objects.with_owner(user).filter(verified=True).count() == 3
|
|
assert AttributeValue.objects.with_owner(user).filter(verified=False).count() == 0
|
|
assert user.verified_attributes.first_name == 'John'
|
|
assert user.verified_attributes.last_name == 'Doe'
|
|
assert user.verified_attributes.title == 'Mr'
|
|
assert resp.json['first_name_verified']
|
|
assert resp.json['last_name_verified']
|
|
assert resp.json['title_verified']
|
|
resp2 = app.patch_json('/api/users/%s/' % resp.json['uuid'],
|
|
params={'title_verified': False})
|
|
assert resp.json['first_name_verified']
|
|
assert resp.json['last_name_verified']
|
|
assert not resp2.json['title_verified']
|
|
|
|
|
|
def test_api_users_create_email_is_unique(settings, app, superuser):
|
|
OU = get_ou_model()
|
|
ou1 = OU.objects.create(name='OU1', slug='ou1')
|
|
ou2 = OU.objects.create(name='OU2', slug='ou2', email_is_unique=True)
|
|
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
# test missing first_name
|
|
payload = {
|
|
'ou': 'ou1',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'email': 'john.doe@example.net',
|
|
}
|
|
|
|
assert User.objects.filter(ou=ou1).count() == 0
|
|
assert User.objects.filter(ou=ou2).count() == 0
|
|
|
|
app.post_json('/api/users/', params=payload)
|
|
assert User.objects.filter(ou=ou1).count() == 1
|
|
|
|
app.post_json('/api/users/', params=payload)
|
|
assert User.objects.filter(ou=ou1).count() == 2
|
|
|
|
payload['ou'] = 'ou2'
|
|
app.post_json('/api/users/', params=payload)
|
|
assert User.objects.filter(ou=ou2).count() == 1
|
|
|
|
resp = app.post_json('/api/users/', params=payload, status=400)
|
|
assert User.objects.filter(ou=ou2).count() == 1
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['email']
|
|
|
|
settings.A2_EMAIL_IS_UNIQUE = True
|
|
User.objects.filter(ou=ou1).delete()
|
|
assert User.objects.filter(ou=ou1).count() == 0
|
|
payload['ou'] = 'ou1'
|
|
app.post_json('/api/users/', params=payload, status=400)
|
|
assert User.objects.filter(ou=ou1).count() == 0
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['email']
|
|
|
|
payload['email'] = 'john.doe2@example.net'
|
|
resp = app.post_json('/api/users/', params=payload)
|
|
uuid = resp.json['uuid']
|
|
|
|
app.patch_json('/api/users/%s/' % uuid, params={'email': 'john.doe3@example.net'})
|
|
resp = app.patch_json('/api/users/%s/' % uuid, params={'email': 'john.doe@example.net'},
|
|
status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['email']
|
|
settings.A2_EMAIL_IS_UNIQUE = False
|
|
|
|
payload['ou'] = 'ou2'
|
|
payload['email'] = 'john.doe2@example.net'
|
|
resp = app.post_json('/api/users/', params=payload)
|
|
assert User.objects.filter(ou=ou2).count() == 2
|
|
uuid = resp.json['uuid']
|
|
resp = app.patch_json('/api/users/%s/' % uuid, params={'email': 'john.doe@example.net'},
|
|
status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['email']
|
|
|
|
|
|
def test_api_users_create_send_mail(app, settings, superuser, rf):
|
|
# Use case is often that Email is the main identifier
|
|
settings.A2_EMAIL_IS_UNIQUE = True
|
|
Attribute.objects.create(kind='title', name='title', label='title')
|
|
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
payload = {
|
|
'username': 'john.doe',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'email': 'john.doe@example.net',
|
|
'title': 'Mr',
|
|
'send_registration_email': True,
|
|
'send_registration_email_next_url': 'http://example.com/',
|
|
}
|
|
assert len(mail.outbox) == 0
|
|
resp = app.post_json('/api/users/', params=payload, status=201)
|
|
user_id = resp.json['id']
|
|
assert len(mail.outbox) == 1
|
|
# Follow activation link
|
|
url = get_link_from_mail(mail.outbox[0])
|
|
relative_url = url.split('testserver')[1]
|
|
resp = app.get(relative_url, status=200)
|
|
resp.form.set('new_password1', '1234==aA')
|
|
resp.form.set('new_password2', '1234==aA')
|
|
resp = resp.form.submit()
|
|
# Check user was properly logged in
|
|
assert str(app.session['_auth_user_id']) == str(user_id)
|
|
assert not good_next_url(rf.get('/'), 'http://example.com')
|
|
assert resp.location == 'http://example.com/'
|
|
|
|
|
|
def test_api_users_create_force_password_reset(app, client, settings, superuser):
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
payload = {
|
|
'username': 'john.doe',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'email': 'john.doe@example.net',
|
|
'password': '1234',
|
|
'force_password_reset': True,
|
|
}
|
|
app.post_json('/api/users/', params=payload, status=201)
|
|
# Verify password reset is enforced on next login
|
|
resp = login(app, 'john.doe', path='/', password='1234').follow()
|
|
resp.form.set('old_password', '1234')
|
|
resp.form.set('new_password1', '1234==aB')
|
|
resp.form.set('new_password2', '1234==aB')
|
|
resp = resp.form.submit('Submit').follow().maybe_follow()
|
|
assert 'Password changed' in resp
|
|
|
|
|
|
def test_api_role_add_member(app, api_user, role, member):
|
|
app.authorization = ('Basic', (api_user.username, api_user.username))
|
|
|
|
authorized = api_user.has_perm('a2_rbac.change_role', role)
|
|
|
|
if member.username == 'fake' or role.name == 'fake':
|
|
status = 404
|
|
elif authorized:
|
|
status = 201
|
|
else:
|
|
status = 403
|
|
|
|
resp = app.post_json('/api/roles/{0}/members/{1}/'.format(role.uuid, member.uuid),
|
|
status=status)
|
|
if status == 404:
|
|
pass
|
|
elif authorized:
|
|
assert resp.json['result'] == 1
|
|
assert resp.json['detail'] == 'User successfully added to role'
|
|
else:
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == 'User not allowed to change role'
|
|
|
|
|
|
def test_api_role_remove_member(app, api_user, role, member):
|
|
app.authorization = ('Basic', (api_user.username, api_user.username))
|
|
|
|
authorized = api_user.is_superuser or api_user.has_perm('a2_rbac.admin_role', role)
|
|
|
|
if member.username == 'fake' or role.name == 'fake':
|
|
status = 404
|
|
elif authorized:
|
|
status = 200
|
|
else:
|
|
status = 403
|
|
|
|
resp = app.delete_json('/api/roles/{0}/members/{1}/'.format(role.uuid, member.uuid),
|
|
status=status)
|
|
|
|
if status == 404:
|
|
pass
|
|
elif authorized:
|
|
assert resp.json['result'] == 1
|
|
assert resp.json['detail'] == 'User successfully removed from role'
|
|
else:
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == 'User not allowed to change role'
|
|
|
|
|
|
def test_api_role_add_members(app, api_user, role, member, member_rando2):
|
|
app.authorization = ('Basic', (api_user.username, api_user.username))
|
|
|
|
authorized = api_user.has_perm('a2_rbac.change_role', role)
|
|
|
|
if role.name == 'fake':
|
|
status = 404
|
|
elif not authorized:
|
|
status = 403
|
|
elif member.username == 'fake':
|
|
status = 400
|
|
else:
|
|
status = 201
|
|
|
|
payload = {
|
|
"data": []
|
|
}
|
|
|
|
for m in [member, member_rando2, member_rando2]: # test no duplicate
|
|
payload['data'].append({"uuid": m.uuid})
|
|
|
|
resp = app.post_json(
|
|
'/api/roles/{}/relationships/members/'.format(role.uuid),
|
|
params=payload, status=status)
|
|
|
|
if status in (400, 404):
|
|
pass
|
|
elif authorized:
|
|
assert resp.json['result'] == 1
|
|
assert resp.json['detail'] == 'Users successfully added to role'
|
|
for m in [member, member_rando2]:
|
|
assert m in role.members.all()
|
|
else:
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == 'User not allowed to change role'
|
|
|
|
|
|
def test_api_role_remove_members(app, api_user, role, member, member_rando2):
|
|
app.authorization = ('Basic', (api_user.username, api_user.username))
|
|
|
|
authorized = api_user.has_perm('a2_rbac.change_role', role)
|
|
|
|
if role.name == 'fake':
|
|
status = 404
|
|
elif not authorized:
|
|
status = 403
|
|
elif member.username == 'fake':
|
|
status = 400
|
|
else:
|
|
status = 200
|
|
|
|
payload = {
|
|
"data": []
|
|
}
|
|
|
|
for m in [member, member_rando2, member_rando2]: # test no duplicate
|
|
payload['data'].append({"uuid": m.uuid})
|
|
|
|
resp = app.delete_json(
|
|
'/api/roles/{}/relationships/members/'.format(role.uuid),
|
|
params=payload, status=status)
|
|
|
|
if status in (400, 404):
|
|
pass
|
|
elif authorized:
|
|
assert resp.json['result'] == 1
|
|
assert resp.json['detail'] == 'Users successfully removed from role'
|
|
for m in [member, member_rando2]:
|
|
assert m not in role.members.all()
|
|
else:
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == 'User not allowed to change role'
|
|
|
|
|
|
def test_api_role_set_members(app, api_user, role, member, member_rando2):
|
|
app.authorization = ('Basic', (api_user.username, api_user.username))
|
|
|
|
authorized = api_user.has_perm('a2_rbac.change_role', role)
|
|
|
|
if role.name == 'fake':
|
|
status = 404
|
|
elif not authorized:
|
|
status = 403
|
|
elif member.username == 'fake':
|
|
status = 400
|
|
else:
|
|
status = 200
|
|
|
|
payload = {
|
|
"data": []
|
|
}
|
|
|
|
for m in [member, member_rando2, member_rando2]: # test no duplicate
|
|
payload['data'].append({"uuid": m.uuid})
|
|
|
|
resp = app.put_json(
|
|
'/api/roles/{}/relationships/members/'.format(role.uuid),
|
|
params=payload, status=status)
|
|
|
|
if status in (400, 404):
|
|
pass
|
|
elif authorized:
|
|
assert resp.json['result'] == 1
|
|
assert resp.json['detail'] == 'Users successfully assigned to role'
|
|
assert len(role.members.all()) == 2
|
|
for m in [member, member_rando2]:
|
|
assert m in role.members.all()
|
|
else:
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == 'User not allowed to change role'
|
|
|
|
|
|
def test_api_role_set_empty_members(app, api_user):
|
|
app.authorization = ('Basic', (api_user.username, api_user.username))
|
|
ou = get_default_ou()
|
|
|
|
user = User.objects.create(ou=ou, username='john.doe', first_name=u'Jôhn',
|
|
last_name=u'Doe', email='john.doe@example.net')
|
|
user.save()
|
|
|
|
role = Role.objects.create(name='Role1', ou=ou)
|
|
role.members.add(user)
|
|
|
|
status = 200
|
|
if not api_user.has_perm('a2_rbac.change_role', role):
|
|
status = 403
|
|
|
|
resp = app.put_json(
|
|
'/api/roles/{}/relationships/members/'.format(role.uuid),
|
|
params={'data': []}, status=status)
|
|
if api_user.has_perm('a2_rbac.change_role', role):
|
|
assert len(role.members.all()) == 0
|
|
else:
|
|
assert len(role.members.all()) == 1
|
|
|
|
|
|
def test_api_role_get_members(app, api_user, role):
|
|
app.authorization = ('Basic', (api_user.username, api_user.username))
|
|
authorized = api_user.has_perm('a2_rbac.change_role', role)
|
|
status = 405 if authorized else 403
|
|
|
|
app.get('/api/roles/{}/relationships/members/'.format(role.uuid), status=status)
|
|
|
|
|
|
def test_api_role_members_payload_missing(app, api_user, role):
|
|
app.authorization = ('Basic', (api_user.username, api_user.username))
|
|
authorized = api_user.has_perm('a2_rbac.change_role', role)
|
|
status = 400 if authorized else 403
|
|
|
|
app.post_json(
|
|
'/api/roles/{}/relationships/members/'.format(role.uuid),
|
|
status=status)
|
|
|
|
app.delete_json(
|
|
'/api/roles/{}/relationships/members/'.format(role.uuid),
|
|
status=status)
|
|
|
|
app.put_json(
|
|
'/api/roles/{}/relationships/members/'.format(role.uuid),
|
|
status=status)
|
|
|
|
|
|
def test_api_role_members_wrong_payload_types(app, superuser, role_random, member_rando2):
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
|
|
payload = [{
|
|
"data": [{'uuid': member_rando2.uuid}]
|
|
}]
|
|
|
|
resp = app.post_json(
|
|
'/api/roles/{}/relationships/members/'.format(role_random.uuid),
|
|
params=payload, status=400)
|
|
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == ['Payload must be a dictionary']
|
|
|
|
payload = {
|
|
"data": [[member_rando2.uuid]]
|
|
}
|
|
|
|
resp = app.post_json(
|
|
'/api/roles/{}/relationships/members/'.format(role_random.uuid),
|
|
params=payload, status=400)
|
|
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == ["List elements of the 'data' dict entry must be dictionaries"]
|
|
|
|
payload = {
|
|
"data": [member_rando2.uuid]
|
|
}
|
|
|
|
resp = app.post_json(
|
|
'/api/roles/{}/relationships/members/'.format(role_random.uuid),
|
|
params=payload, status=400)
|
|
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == ["List elements of the 'data' dict entry must be dictionaries"]
|
|
|
|
|
|
def test_register_no_email_validation(settings, app, admin, django_user_model):
|
|
settings.A2_USERNAME_IS_UNIQUE = False
|
|
settings.A2_REGISTRATION_USERNAME_IS_UNIQUE = False
|
|
User = django_user_model
|
|
password = '12XYab'
|
|
username = 'john.doe'
|
|
email = 'john.doe@example.com'
|
|
first_name = 'John'
|
|
last_name = 'Doe'
|
|
return_url = 'http://sp.example.com/validate/'
|
|
|
|
# invalid payload
|
|
payload = {
|
|
'last_name': last_name,
|
|
'return_url': return_url,
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
assert len(mail.outbox) == 0
|
|
response = app.post_json(reverse('a2-api-register'), params=payload, headers=headers,
|
|
status=400)
|
|
assert 'errors' in response.json
|
|
assert response.json['result'] == 0
|
|
assert response.json['errors'] == {
|
|
'__all__': ['You must set at least a username, an email or a first name and a last name'],
|
|
}
|
|
|
|
# valid payload
|
|
payload = {
|
|
'username': username,
|
|
'email': email,
|
|
'first_name': first_name,
|
|
'last_name': last_name,
|
|
'password': password,
|
|
'no_email_validation': True,
|
|
'return_url': return_url,
|
|
}
|
|
assert len(mail.outbox) == 0
|
|
response = app.post_json(reverse('a2-api-register'), params=payload, headers=headers)
|
|
assert len(mail.outbox) == 0
|
|
assert response.status_code == 201
|
|
assert response.json['result'] == 1
|
|
assert response.json['user']['username'] == username
|
|
assert response.json['user']['email'] == email
|
|
assert response.json['user']['first_name'] == first_name
|
|
assert response.json['user']['last_name'] == last_name
|
|
assert check_password(password, response.json['user']['password'])
|
|
assert response.json['token']
|
|
assert response.json['validation_url'].startswith('http://testserver/accounts/activate/')
|
|
assert User.objects.count() == 2
|
|
user = User.objects.latest('id')
|
|
assert user.ou == get_default_ou()
|
|
assert user.username == username
|
|
assert user.email == email
|
|
assert user.first_name == first_name
|
|
assert user.last_name == last_name
|
|
assert user.check_password(password)
|
|
|
|
|
|
def test_register_ou_no_email_validation(settings, app, admin, django_user_model):
|
|
settings.A2_USERNAME_IS_UNIQUE = False
|
|
settings.A2_REGISTRATION_USERNAME_IS_UNIQUE = False
|
|
User = django_user_model
|
|
password = '12XYab'
|
|
username = 'john.doe'
|
|
email = 'john.doe@example.com'
|
|
first_name = 'John'
|
|
last_name = 'Doe'
|
|
return_url = 'http://sp.example.com/validate/'
|
|
ou = 'default'
|
|
|
|
# invalid payload
|
|
payload = {
|
|
'last_name': last_name,
|
|
'return_url': return_url,
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
assert len(mail.outbox) == 0
|
|
response = app.post_json(reverse('a2-api-register'), params=payload, headers=headers,
|
|
status=400)
|
|
assert 'errors' in response.json
|
|
assert response.json['result'] == 0
|
|
assert response.json['errors'] == {
|
|
'__all__': ['You must set at least a username, an email or a first name and a last name'],
|
|
}
|
|
|
|
# valid payload
|
|
payload = {
|
|
'username': username,
|
|
'email': email,
|
|
'first_name': first_name,
|
|
'last_name': last_name,
|
|
'password': password,
|
|
'no_email_validation': True,
|
|
'return_url': return_url,
|
|
'ou': ou,
|
|
}
|
|
assert len(mail.outbox) == 0
|
|
response = app.post_json(reverse('a2-api-register'), params=payload, headers=headers)
|
|
assert len(mail.outbox) == 0
|
|
assert response.status_code == 201
|
|
assert response.json['result'] == 1
|
|
assert response.json['user']['username'] == username
|
|
assert response.json['user']['email'] == email
|
|
assert response.json['user']['first_name'] == first_name
|
|
assert response.json['user']['last_name'] == last_name
|
|
assert check_password(password, response.json['user']['password'])
|
|
assert response.json['token']
|
|
assert response.json['validation_url'].startswith('http://testserver/accounts/activate/')
|
|
assert User.objects.count() == 2
|
|
user = User.objects.latest('id')
|
|
assert user.username == username
|
|
assert user.email == email
|
|
assert user.first_name == first_name
|
|
assert user.last_name == last_name
|
|
assert user.check_password(password)
|
|
|
|
|
|
def test_user_synchronization(app, simple_user):
|
|
headers = basic_authorization_header(simple_user)
|
|
uuids = []
|
|
for i in range(100):
|
|
user = User.objects.create(first_name='ben', last_name='dauve')
|
|
uuids.append(user.uuid)
|
|
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
|
|
url = reverse('a2-api-users-synchronization')
|
|
content = {
|
|
'known_uuids': uuids + unknown_uuids,
|
|
}
|
|
random.shuffle(content['known_uuids'])
|
|
response = app.post_json(url, params=content, headers=headers, status=403)
|
|
|
|
# give custom_user.search_user permission to user
|
|
r = Role.objects.get_admin_role(ContentType.objects.get_for_model(User),
|
|
name='role',
|
|
slug='role',
|
|
operation=SEARCH_OP)
|
|
|
|
r.members.add(simple_user)
|
|
response = app.post_json(url, params=content, headers=headers)
|
|
assert response.json['result'] == 1
|
|
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
|
|
|
|
|
|
def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client):
|
|
url = '/api/users/%s/' % user_ou1.uuid
|
|
# test invalid client
|
|
app.authorization = ('Basic', ('foo', 'bar'))
|
|
resp = app.get(url, status=401)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == "Invalid username/password."
|
|
# test inactive client
|
|
admin.is_active = False
|
|
admin.save()
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
resp = app.get(url, status=401)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == "User inactive or deleted."
|
|
# test oidc client
|
|
app.authorization = ('Basic', (oidc_client.username, oidc_client.username))
|
|
app.get(url, status=200)
|
|
# test oidc client without has API access
|
|
oidc_client.oidc_client.has_api_access = False
|
|
oidc_client.oidc_client.save()
|
|
app.authorization = ('Basic', (oidc_client.username, oidc_client.username))
|
|
response = app.get(url, status=401)
|
|
assert response.json['result'] == 0
|
|
assert response.json['errors']
|
|
|
|
|
|
def test_api_check_password(app, superuser, oidc_client, user_ou1):
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
# test with invalid paylaod
|
|
payload = {'username': 'whatever'}
|
|
resp = app.post_json(reverse('a2-api-check-password'), params=payload, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == {u'password': [u'This field is required.']}
|
|
# test with invalid credentials
|
|
payload = {'username': 'whatever', 'password': 'password'}
|
|
resp = app.post_json(reverse('a2-api-check-password'), params=payload, status=200)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors'] == ["Invalid username/password."]
|
|
# test with valid credentials
|
|
payload = {'username': user_ou1.username, 'password': user_ou1.username}
|
|
resp = app.post_json(reverse('a2-api-check-password'), params=payload, status=200)
|
|
assert resp.json['result'] == 1
|
|
# test valid oidc credentials
|
|
payload = {
|
|
'username': oidc_client.oidc_client.client_id,
|
|
'password': oidc_client.oidc_client.client_secret,
|
|
}
|
|
resp = app.post_json(reverse('a2-api-check-password'), params=payload, status=200)
|
|
assert resp.json['result'] == 1
|
|
assert resp.json['oidc_client'] is True
|
|
|
|
|
|
def test_password_change(app, ou1, admin):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
|
|
user1 = User(username='john.doe', email='john.doe@example.com', ou=ou1)
|
|
user1.set_password('password')
|
|
user1.save()
|
|
user2 = User(username='john.doe2', email='john.doe@example.com', ou=ou1)
|
|
user2.set_password('password')
|
|
user2.save()
|
|
|
|
payload = {
|
|
'email': 'none@example.com',
|
|
'ou': ou1.slug,
|
|
'old_password': 'password',
|
|
'new_password': 'password2',
|
|
}
|
|
url = reverse('a2-api-password-change')
|
|
response = app.post_json(url, params=payload, status=400)
|
|
assert 'errors' in response.json
|
|
assert response.json['result'] == 0
|
|
|
|
payload = {
|
|
'email': 'john.doe@example.com',
|
|
'ou': ou1.slug,
|
|
'old_password': 'password',
|
|
'new_password': 'password2',
|
|
}
|
|
response = app.post_json(url, params=payload, status=400)
|
|
assert 'errors' in response.json
|
|
assert response.json['result'] == 0
|
|
user2.delete()
|
|
|
|
response = app.post_json(url, params=payload)
|
|
assert response.json['result'] == 1
|
|
assert User.objects.get(username='john.doe').check_password('password2')
|
|
|
|
|
|
def test_password_reset(app, ou1, admin, user_ou1, mailoutbox):
|
|
email = user_ou1.email
|
|
url = reverse('a2-api-users-password-reset', kwargs={'uuid': user_ou1.uuid})
|
|
app.authorization = ('Basic', (user_ou1.username, user_ou1.username))
|
|
app.post(url, status=403)
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
app.get(url, status=405)
|
|
user_ou1.email = ''
|
|
user_ou1.save()
|
|
resp = app.post(url, status=500)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['reason'] == 'User has no mail'
|
|
user_ou1.email = email
|
|
user_ou1.save()
|
|
app.post(url, status=204)
|
|
assert len(mailoutbox) == 1
|
|
mail = mailoutbox[0]
|
|
assert mail.to[0] == email
|
|
assert 'http://testserver/accounts/password/reset/confirm/' in mail.body
|
|
|
|
|
|
def test_users_email(app, ou1, admin, user_ou1, mailoutbox):
|
|
url = reverse('a2-api-users-email', kwargs={'uuid': user_ou1.uuid})
|
|
# test access error
|
|
app.authorization = ('Basic', (user_ou1.username, user_ou1.username))
|
|
app.post(url, status=403)
|
|
|
|
# test method error
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
app.get(url, status=405)
|
|
|
|
new_email = 'newmail@yopmail.com'
|
|
resp = app.post_json(url, params={'email': new_email})
|
|
assert resp.json['result'] == 1
|
|
|
|
assert len(mailoutbox) == 1
|
|
mail = mailoutbox[0]
|
|
|
|
assert mail.to[0] == new_email
|
|
assert 'http://testserver/accounts/change-email/verify/' in mail.body
|
|
|
|
|
|
def test_api_delete_role(app, admin_ou1, role_ou1):
|
|
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
|
app.delete('/api/roles/{}/'.format(role_ou1.uuid))
|
|
assert not len(Role.objects.filter(slug='role_ou1'))
|
|
|
|
|
|
def test_api_delete_role_unauthorized(app, simple_user, role_ou1):
|
|
app.authorization = ('Basic', (simple_user.username, simple_user.username))
|
|
app.delete('/api/roles/{}/'.format(role_ou1.uuid), status=404)
|
|
assert len(Role.objects.filter(slug='role_ou1'))
|
|
|
|
|
|
def test_api_patch_role(app, admin_ou1, role_ou1):
|
|
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
|
|
|
role_data = {
|
|
'slug': 'updated-role',
|
|
}
|
|
app.patch_json('/api/roles/{}/'.format(role_ou1.uuid), params=role_data)
|
|
|
|
# The role API won't change the organizational unit attribute:
|
|
role_ou1.refresh_from_db()
|
|
assert role_ou1.name == 'role_ou1'
|
|
assert role_ou1.slug == 'updated-role'
|
|
assert role_ou1.ou.slug == 'ou1'
|
|
|
|
|
|
def test_api_patch_role_unauthorized(app, simple_user, role_ou1):
|
|
app.authorization = ('Basic', (simple_user.username, simple_user.username))
|
|
role_data = {
|
|
'slug': 'updated-role',
|
|
}
|
|
app.patch_json('/api/roles/{}/'.format(role_ou1.uuid), params=role_data, status=404)
|
|
role_ou1.refresh_from_db()
|
|
assert role_ou1.slug == 'role_ou1'
|
|
assert not len(Role.objects.filter(slug='updated-role'))
|
|
|
|
|
|
def test_api_put_role(app, admin_ou1, role_ou1, ou1):
|
|
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
|
|
|
role_data = {
|
|
'name': 'updated-role',
|
|
'slug': 'updated-role',
|
|
'ou': 'ou2'
|
|
}
|
|
app.put_json('/api/roles/{}/'.format(role_ou1.uuid), params=role_data)
|
|
role_ou1.refresh_from_db()
|
|
assert role_ou1.name == 'updated-role'
|
|
assert role_ou1.slug == 'updated-role'
|
|
assert role_ou1.ou.slug == 'ou1'
|
|
|
|
|
|
def test_api_put_role_unauthorized(app, simple_user, role_ou1, ou1):
|
|
app.authorization = ('Basic', (simple_user.username, simple_user.username))
|
|
role_data = {
|
|
'name': 'updated-role',
|
|
'slug': 'updated-role',
|
|
'ou': 'ou2'
|
|
}
|
|
app.put_json('/api/roles/{}/'.format(role_ou1.uuid), params=role_data, status=404)
|
|
role_ou1.refresh_from_db()
|
|
assert role_ou1.name == 'role_ou1'
|
|
assert role_ou1.slug == 'role_ou1'
|
|
assert role_ou1.ou.slug == 'ou1'
|
|
|
|
|
|
def test_api_post_role(app, admin_ou1, ou1):
|
|
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
|
|
|
role_data = {
|
|
'slug': 'coffee-manager',
|
|
'name': 'Coffee Manager',
|
|
'ou': 'ou1'
|
|
}
|
|
resp = app.post_json('/api/roles/', params=role_data)
|
|
assert isinstance(resp.json, dict)
|
|
uuid = resp.json['uuid']
|
|
|
|
# Check attribute values against the server's response:
|
|
assert set(role_data.items()) < set(resp.json.items())
|
|
|
|
# Check attributes values against the DB:
|
|
role = get_role_model().objects.get(uuid=uuid)
|
|
assert role.slug == role_data['slug']
|
|
assert role.name == role_data['name']
|
|
assert role.ou.slug == role_data['ou']
|
|
|
|
|
|
def test_api_post_role_no_ou(app, superuser):
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
role_data = {
|
|
'slug': 'tea-manager',
|
|
'name': 'Tea Manager',
|
|
}
|
|
resp = app.post_json('/api/roles/', params=role_data)
|
|
uuid = resp.json['uuid']
|
|
role = Role.objects.get(uuid=uuid)
|
|
assert role.ou == get_default_ou()
|
|
|
|
|
|
def test_api_post_role_no_slug(app, superuser):
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
role_data = {
|
|
'name': 'Some Role',
|
|
}
|
|
resp = app.post_json('/api/roles/', params=role_data)
|
|
uuid = resp.json['uuid']
|
|
role = Role.objects.get(uuid=uuid)
|
|
assert role.slug == slugify(role.name)
|
|
assert role.slug == slugify(role_data['name'])
|
|
# another call with same role name
|
|
role_data = {
|
|
'name': 'Some Role',
|
|
}
|
|
resp = app.post_json('/api/roles/', params=role_data, status=400)
|
|
assert resp.json['errors']['__all__'] == [
|
|
'The fields name, ou must make a unique set.',
|
|
'The fields slug, ou must make a unique set.',
|
|
]
|
|
# no slug no name
|
|
role_data = {
|
|
'id': 42,
|
|
}
|
|
resp = app.post_json('/api/roles/', params=role_data, status=400)
|
|
assert resp.json['errors']['name'] == ['This field is required.']
|
|
|
|
|
|
def test_api_post_ou_no_slug(app, superuser):
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
OU = get_ou_model()
|
|
|
|
ou_data = {
|
|
'name': 'Some Organizational Unit',
|
|
}
|
|
resp = app.post_json('/api/ous/', params=ou_data)
|
|
uuid = resp.json['uuid']
|
|
ou = OU.objects.get(uuid=uuid)
|
|
assert ou.id != get_default_ou().id
|
|
assert ou.slug == slugify(ou.name)
|
|
assert ou.slug == slugify(ou_data['name'])
|
|
# another call with same ou name
|
|
ou_data = {
|
|
'name': 'Some Organizational Unit',
|
|
}
|
|
resp = app.post_json('/api/ous/', params=ou_data, status=400)
|
|
assert resp.json['errors']['__all__'] == [
|
|
"The fields name must make a unique set.",
|
|
"The fields slug must make a unique set."
|
|
]
|
|
# no slug no name
|
|
ou_data = {
|
|
'id': 42,
|
|
}
|
|
resp = app.post_json('/api/ous/', params=ou_data, status=400)
|
|
assert resp.json['errors']['name'] == ['This field is required.']
|
|
|
|
|
|
def test_api_post_ou_get_or_create(app, superuser):
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
OU = get_ou_model()
|
|
# first get-or-create? -> create
|
|
ou_data = {
|
|
'name': 'Some Organizational Unit',
|
|
}
|
|
resp = app.post_json('/api/ous/', params=ou_data)
|
|
uuid = resp.json['uuid']
|
|
ou = OU.objects.get(uuid=uuid)
|
|
# second get-or-create? -> get
|
|
ou_data = {
|
|
'name': 'Some Organizational Unit',
|
|
'slug': ou.slug,
|
|
}
|
|
resp = app.post_json('/api/ous/?get_or_create=slug', params=ou_data)
|
|
assert resp.json['uuid'] == ou.uuid
|
|
# update-or-create? -> update
|
|
ou_data = {
|
|
'name': 'Another name',
|
|
'slug': ou.slug,
|
|
}
|
|
resp = app.post_json('/api/ous/?update_or_create=slug', params=ou_data)
|
|
assert resp.json['uuid'] == ou.uuid
|
|
assert OU.objects.get(uuid=resp.json['uuid']).name == ou_data['name']
|
|
|
|
|
|
def test_api_post_role_unauthorized(app, simple_user, ou1):
|
|
app.authorization = ('Basic', (simple_user.username, simple_user.username))
|
|
|
|
role_data = {
|
|
'slug': 'mocca-manager',
|
|
'name': 'Mocca Manager',
|
|
'ou': 'ou1'
|
|
}
|
|
|
|
app.post_json('/api/roles/', params=role_data, status=403)
|
|
assert not len(Role.objects.filter(slug='mocca-manager'))
|
|
|
|
|
|
def test_api_get_role_description(app, admin_rando_role, role_random):
|
|
app.authorization = ('Basic', (admin_rando_role.username, admin_rando_role.username))
|
|
resp = app.get('/api/roles/{}/'.format(role_random.uuid))
|
|
|
|
assert resp.json['slug'] == 'rando'
|
|
assert resp.json['ou'] == 'ou_rando'
|
|
|
|
|
|
def test_api_get_role_not_found(app, superuser):
|
|
app.authorization = ('Basic', (superuser.username, superuser.username))
|
|
app.get('/api/roles/thisisnotavalidroleslug/', status=404)
|
|
|
|
|
|
def test_api_get_role_list(app, admin_ou1, role_ou1, role_random):
|
|
app.authorization = ('Basic', (admin_ou1.username, admin_ou1.username))
|
|
resp = app.get('/api/roles/')
|
|
|
|
role_fields = ['slug', 'uuid', 'name', 'ou']
|
|
|
|
assert len(resp.json['results'])
|
|
|
|
for role_dict in resp.json['results']:
|
|
for field in role_fields:
|
|
assert field in role_dict
|
|
|
|
|
|
def test_no_opened_session_cookie_on_api(app, user, settings):
|
|
settings.A2_OPENED_SESSION_COOKIE_DOMAIN = 'testserver.local'
|
|
app.authorization = ('Basic', (user.username, user.username))
|
|
resp = app.get('/api/users/')
|
|
assert 'A2_OPENED_SESSION' not in app.cookies
|
|
|
|
|
|
def test_validate_password_default(app):
|
|
for password, ok, length, lower, digit, upper in (
|
|
('.', False, False, False, False, False),
|
|
('x' * 8, False, True, True, False, False),
|
|
('x' * 8 + '1', False, True, True, True, False),
|
|
('x' * 8 + '1X', True, True, True, True, True)):
|
|
response = app.post_json('/api/validate-password/', params={'password': password})
|
|
assert response.json['result'] == 1
|
|
assert response.json['ok'] is ok
|
|
assert len(response.json['checks']) == 4
|
|
assert response.json['checks'][0]['label'] == '8 characters'
|
|
assert response.json['checks'][0]['result'] is length
|
|
assert response.json['checks'][1]['label'] == '1 lowercase letter'
|
|
assert response.json['checks'][1]['result'] is lower
|
|
assert response.json['checks'][2]['label'] == '1 digit'
|
|
assert response.json['checks'][2]['result'] is digit
|
|
assert response.json['checks'][3]['label'] == '1 uppercase letter'
|
|
assert response.json['checks'][3]['result'] is upper
|
|
|
|
|
|
def test_validate_password_regex(app, settings):
|
|
settings.A2_PASSWORD_POLICY_REGEX = '^.*ok.*$'
|
|
settings.A2_PASSWORD_POLICY_REGEX_ERROR_MSG = 'must contain "ok"'
|
|
|
|
response = app.post_json('/api/validate-password/', params={'password': 'x' * 8 + '1X'})
|
|
assert response.json['result'] == 1
|
|
assert response.json['ok'] is False
|
|
assert len(response.json['checks']) == 5
|
|
assert response.json['checks'][0]['label'] == '8 characters'
|
|
assert response.json['checks'][0]['result'] is True
|
|
assert response.json['checks'][1]['label'] == '1 lowercase letter'
|
|
assert response.json['checks'][1]['result'] is True
|
|
assert response.json['checks'][2]['label'] == '1 digit'
|
|
assert response.json['checks'][2]['result'] is True
|
|
assert response.json['checks'][3]['label'] == '1 uppercase letter'
|
|
assert response.json['checks'][3]['result'] is True
|
|
assert response.json['checks'][4]['label'] == 'must contain "ok"'
|
|
assert response.json['checks'][4]['result'] is False
|
|
|
|
response = app.post_json('/api/validate-password/', params={'password': 'x' * 8 + 'ok1X'})
|
|
assert response.json['result'] == 1
|
|
assert response.json['ok'] is True
|
|
assert len(response.json['checks']) == 5
|
|
assert response.json['checks'][0]['label'] == '8 characters'
|
|
assert response.json['checks'][0]['result'] is True
|
|
assert response.json['checks'][1]['label'] == '1 lowercase letter'
|
|
assert response.json['checks'][1]['result'] is True
|
|
assert response.json['checks'][2]['label'] == '1 digit'
|
|
assert response.json['checks'][2]['result'] is True
|
|
assert response.json['checks'][3]['label'] == '1 uppercase letter'
|
|
assert response.json['checks'][3]['result'] is True
|
|
assert response.json['checks'][4]['label'] == 'must contain "ok"'
|
|
assert response.json['checks'][4]['result'] is True
|
|
|
|
|
|
def test_api_users_get_or_create(settings, app, admin):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
# test missing first_name
|
|
payload = {
|
|
'email': 'john.doe@example.net',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
}
|
|
resp = app.post_json('/api/users/?get_or_create=email', params=payload, status=201)
|
|
id = resp.json['id']
|
|
assert User.objects.get(id=id).first_name == 'John'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
password = User.objects.get(id=id).password
|
|
|
|
resp = app.post_json('/api/users/?get_or_create=email', params=payload, status=200)
|
|
assert id == resp.json['id']
|
|
assert User.objects.get(id=id).first_name == 'John'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
assert User.objects.get(id=id).password == password
|
|
|
|
payload = {
|
|
'email': 'john.doe@example.net',
|
|
'first_name': 'Jane',
|
|
}
|
|
resp = app.post_json('/api/users/?update_or_create=email', params=payload, status=200)
|
|
assert id == resp.json['id']
|
|
assert User.objects.get(id=id).first_name == 'Jane'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
assert User.objects.get(id=id).password == password
|
|
|
|
payload['password'] = 'secret'
|
|
resp = app.post_json('/api/users/?update_or_create=email', params=payload, status=200)
|
|
assert User.objects.get(id=id).first_name == 'Jane'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
assert User.objects.get(id=id).password != password
|
|
assert User.objects.get(id=id).check_password('secret')
|
|
|
|
|
|
def test_api_users_get_or_create_email_is_unique(settings, app, admin):
|
|
settings.A2_EMAIL_IS_UNIQUE = True
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
# test missing first_name
|
|
payload = {
|
|
'email': 'john.doe@example.net',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
}
|
|
resp = app.post_json('/api/users/?get_or_create=email', params=payload, status=201)
|
|
id = resp.json['id']
|
|
assert User.objects.get(id=id).first_name == 'John'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
|
|
resp = app.post_json('/api/users/?get_or_create=email', params=payload, status=200)
|
|
assert id == resp.json['id']
|
|
assert User.objects.get(id=id).first_name == 'John'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
|
|
payload['first_name'] = 'Jane'
|
|
resp = app.post_json('/api/users/?update_or_create=email', params=payload, status=200)
|
|
assert id == resp.json['id']
|
|
assert User.objects.get(id=id).first_name == 'Jane'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
|
|
|
|
def test_api_users_get_or_create_email_not_unique(settings, app, admin):
|
|
settings.A2_EMAIL_IS_UNIQUE = False
|
|
OU = get_ou_model()
|
|
ou1 = OU.objects.create(name='OU1', slug='ou1', email_is_unique=True)
|
|
ou2 = OU.objects.create(name='OU2', slug='ou2', email_is_unique=False)
|
|
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
payload = {
|
|
'email': 'john.doe@example.net',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'ou': 'ou1'
|
|
}
|
|
# 1. create
|
|
resp = app.post_json('/api/users/?get_or_create=email', params=payload, status=201)
|
|
id_user = resp.json['id']
|
|
assert User.objects.get(id=id_user).first_name == 'John'
|
|
assert User.objects.get(id=id_user).last_name == 'Doe'
|
|
assert User.objects.get(id=id_user).ou == ou1
|
|
# 2. get
|
|
resp = app.post_json('/api/users/?get_or_create=email', params=payload, status=200)
|
|
# 3. explicitly create in a different OU
|
|
payload['ou'] = 'ou2'
|
|
resp = app.post_json('/api/users/', params=payload, status=201)
|
|
id_user2 = resp.json['id']
|
|
assert id_user2 != id_user
|
|
assert User.objects.get(id=id_user2).first_name == 'John'
|
|
assert User.objects.get(id=id_user2).last_name == 'Doe'
|
|
assert User.objects.get(id=id_user2).ou == ou2
|
|
# 4. fail to retrieve a single instance for an ambiguous get-or-create key
|
|
resp = app.post_json('/api/users/?get_or_create=email', params=payload, status=409)
|
|
assert resp.json['errors'] == "retrieved several instances of model User for key attributes {'email': 'john.doe@example.net'}"
|
|
|
|
|
|
def test_api_users_get_or_create_multi_key(settings, app, admin):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
# test missing first_name
|
|
payload = {
|
|
'email': 'john.doe@example.net',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
}
|
|
resp = app.post_json('/api/users/?get_or_create=first_name&get_or_create=last_name', params=payload, status=201)
|
|
id = resp.json['id']
|
|
assert User.objects.get(id=id).first_name == 'John'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
password = User.objects.get(id=id).password
|
|
|
|
resp = app.post_json('/api/users/?get_or_create=first_name&get_or_create=last_name', params=payload, status=200)
|
|
assert id == resp.json['id']
|
|
assert User.objects.get(id=id).first_name == 'John'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
assert User.objects.get(id=id).password == password
|
|
|
|
payload['email'] = 'john.doe@example2.net'
|
|
payload['password'] = 'secret'
|
|
resp = app.post_json(
|
|
'/api/users/?update_or_create=first_name&update_or_create=last_name',
|
|
params=payload, status=200)
|
|
assert id == resp.json['id']
|
|
assert User.objects.get(id=id).email == 'john.doe@example2.net'
|
|
assert User.objects.get(id=id).password != password
|
|
assert User.objects.get(id=id).check_password('secret')
|
|
|
|
|
|
def test_api_roles_get_or_create(settings, ou1, app, admin):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
# test missing first_name
|
|
payload = {
|
|
'ou': 'ou1',
|
|
'name': 'Role 1',
|
|
'slug': 'role-1',
|
|
}
|
|
resp = app.post_json('/api/roles/?get_or_create=slug', params=payload, status=201)
|
|
uuid = resp.json['uuid']
|
|
assert Role.objects.get(uuid=uuid).name == 'Role 1'
|
|
assert Role.objects.get(uuid=uuid).slug == 'role-1'
|
|
assert Role.objects.get(uuid=uuid).ou == ou1
|
|
|
|
resp = app.post_json('/api/roles/?get_or_create=slug', params=payload, status=200)
|
|
assert uuid == resp.json['uuid']
|
|
|
|
payload['name'] = 'Role 2'
|
|
resp = app.post_json('/api/roles/?update_or_create=slug', params=payload, status=200)
|
|
assert uuid == resp.json['uuid']
|
|
assert Role.objects.get(uuid=uuid).name == 'Role 2'
|
|
assert Role.objects.get(uuid=uuid).slug == 'role-1'
|
|
|
|
|
|
def test_api_users_hashed_password(settings, app, admin):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
payload = {
|
|
'email': 'john.doe@example.net',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'hashed_password': 'pbkdf2_sha256$36000$re9zaUj1ize0$bX1cqB91ni4aMOtRh8//TLaJkX+xnD2w84MCQx9AJcE=',
|
|
}
|
|
resp = app.post_json('/api/users/?get_or_create=email', params=payload, status=201)
|
|
id = resp.json['id']
|
|
assert User.objects.get(id=id).first_name == 'John'
|
|
assert User.objects.get(id=id).last_name == 'Doe'
|
|
assert User.objects.get(id=id).check_password('admin')
|
|
password = User.objects.get(id=id).password
|
|
|
|
payload['hashed_password'] = 'sha-oldap$$e5e9fa1ba31ecd1ae84f75caaa474f3a663f05f4'
|
|
resp = app.post_json('/api/users/?update_or_create=email', params=payload, status=200)
|
|
assert User.objects.get(id=id).password != password
|
|
assert User.objects.get(id=id).check_password('secret')
|
|
|
|
payload['password'] = 'secret'
|
|
resp = app.post_json('/api/users/?update_or_create=email', params=payload, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['password'] == ['conflict with provided hashed_password']
|
|
|
|
del payload['password']
|
|
payload['hashed_password'] = 'unknown_format'
|
|
resp = app.post_json('/api/users/?update_or_create=email', params=payload, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['hashed_password'] == ['unknown hash format']
|
|
|
|
payload['hashed_password'] = 'argon2$wrong_format'
|
|
resp = app.post_json('/api/users/?update_or_create=email', params=payload, status=400)
|
|
assert resp.json['result'] == 0
|
|
if django.VERSION >= (1, 10, 0):
|
|
assert resp.json['errors']['hashed_password'] == ['hash format error']
|
|
else: # before Django 1.10 Argon2 is not recognized
|
|
assert resp.json['errors']['hashed_password'] == ['unknown hash format']
|
|
|
|
|
|
def test_api_users_required_attribute(settings, app, admin, simple_user):
|
|
assert Attribute.objects.get(name='last_name').required is True
|
|
|
|
payload = {
|
|
'username': simple_user.username,
|
|
'id': simple_user.id,
|
|
'email': 'john.doe@nowhere.null',
|
|
'first_name': 'Johnny',
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
|
|
# create fails
|
|
resp = app.post_json('/api/users/', params=payload, headers=headers, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['last_name'] == ['This field is required.']
|
|
|
|
# update from missing value to blank field fails
|
|
payload['last_name'] = ''
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid), params=payload, headers=headers, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['last_name'] == ['This field may not be blank.']
|
|
|
|
# update with value pass
|
|
payload['last_name'] = 'Foobar'
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid), params=payload, headers=headers, status=200)
|
|
user = User.objects.get(id=simple_user.id)
|
|
assert user.last_name == 'Foobar'
|
|
|
|
# update from non-empty value to blank fails
|
|
payload['last_name'] = ''
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid), params=payload, headers=headers, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['last_name'] == ['This field may not be blank.']
|
|
|
|
|
|
def test_api_users_required_date_attributes(settings, app, admin, simple_user):
|
|
Attribute.objects.create(kind='string', name='prefered_color', label='prefered color', required=True)
|
|
Attribute.objects.create(kind='date', name='date', label='date', required=True)
|
|
Attribute.objects.create(kind='birthdate', name='birthdate', label='birthdate', required=True)
|
|
|
|
payload = {
|
|
'username': simple_user.username,
|
|
'id': simple_user.id,
|
|
'email': 'john.doe@nowhere.null',
|
|
'first_name': 'Johnny',
|
|
'last_name': 'Doe',
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
|
|
# create fails
|
|
resp = app.post_json('/api/users/', params=payload, headers=headers, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['prefered_color'] == ['This field is required.']
|
|
assert resp.json['errors']['date'] == ['This field is required.']
|
|
assert resp.json['errors']['birthdate'] == ['This field is required.']
|
|
|
|
# update from missing value to blank fails
|
|
payload['prefered_color'] = ''
|
|
payload['date'] = ''
|
|
payload['birthdate'] = ''
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid), params=payload, headers=headers, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['prefered_color'] == ['This field may not be blank.']
|
|
assert resp.json['errors']['date'] == ['This field may not be blank.']
|
|
assert resp.json['errors']['birthdate'] == ['This field may not be blank.']
|
|
|
|
# update with invalid values fails
|
|
payload['prefered_color'] = '?' * 257
|
|
payload['date'] = '0000-00-00'
|
|
payload['birthdate'] = '1899-12-31'
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid), params=payload, headers=headers, status=400)
|
|
assert resp.json['result'] == 0
|
|
assert resp.json['errors']['prefered_color'] == ['Ensure this field has no more than 256 characters.']
|
|
assert any(error.startswith('Date has wrong format.') for error in resp.json['errors']['date'])
|
|
assert resp.json['errors']['birthdate'] == ['birthdate must be in the past and greater or equal than 1900-01-01.']
|
|
|
|
# update with values pass
|
|
payload['prefered_color'] = 'blue'
|
|
payload['date'] = '1515-1-15'
|
|
payload['birthdate'] = '1900-2-22'
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid), params=payload, headers=headers, status=200)
|
|
|
|
# value are properly returned on a get
|
|
resp = app.get('/api/users/{}/'.format(simple_user.uuid), headers=headers, status=200)
|
|
assert resp.json['prefered_color'] == 'blue'
|
|
assert resp.json['date'] == '1515-01-15'
|
|
assert resp.json['birthdate'] == '1900-02-22'
|
|
|
|
|
|
def test_api_users_optional_date_attributes(settings, app, admin, simple_user):
|
|
Attribute.objects.create(kind='string', name='prefered_color', label='prefered color', required=False)
|
|
Attribute.objects.create(kind='date', name='date', label='date', required=False)
|
|
Attribute.objects.create(kind='birthdate', name='birthdate', label='birthdate', required=False)
|
|
|
|
payload = {
|
|
'username': simple_user.username,
|
|
'id': simple_user.id,
|
|
'email': 'john.doe@nowhere.null',
|
|
'first_name': 'Johnny',
|
|
'last_name': 'Doe',
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid),
|
|
params=payload, headers=headers, status=200)
|
|
resp = app.get('/api/users/{}/'.format(simple_user.uuid),
|
|
headers=headers, status=200)
|
|
payload['prefered_color'] = None
|
|
payload['date'] = None
|
|
payload['birthdate'] = None
|
|
|
|
payload['prefered_color'] = ''
|
|
payload['date'] = ''
|
|
payload['birthdate'] = ''
|
|
resp = app.put_json('/api/users/{}/'.format(simple_user.uuid),
|
|
params=payload, headers=headers, status=200)
|
|
resp = app.get('/api/users/{}/'.format(simple_user.uuid),
|
|
headers=headers, status=200)
|
|
payload['prefered_color'] = None
|
|
payload['date'] = None
|
|
payload['birthdate'] = None
|
|
|
|
|
|
def test_filter_users_by_service(app, admin, simple_user, role_random, service):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
|
|
resp = app.get('/api/users/')
|
|
assert len(resp.json['results']) == 2
|
|
|
|
resp = app.get('/api/users/?service-slug=xxx')
|
|
assert len(resp.json['results']) == 0
|
|
|
|
resp = app.get('/api/users/?service-slug=service&service-ou=default')
|
|
assert len(resp.json['results']) == 2
|
|
|
|
role_random.members.add(simple_user)
|
|
AuthorizedRole.objects.get_or_create(service=service, role=role_random)
|
|
|
|
resp = app.get('/api/users/?service-slug=service&service-ou=default')
|
|
assert len(resp.json['results']) == 1
|
|
|
|
|
|
def test_filter_users_by_last_modification(app, admin, simple_user, freezer):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
|
|
freezer.move_to('2019-10-27T02:00:00Z')
|
|
|
|
admin.save()
|
|
simple_user.save()
|
|
|
|
# AmbiguousTimeError
|
|
resp = app.get('/api/users/', params={'modified__gt': '2019-10-27T02:58:07'})
|
|
assert len(resp.json['results']) == 2
|
|
resp = app.get('/api/users/', params={'modified__lt': '2019-10-27T02:58:07'})
|
|
assert len(resp.json['results']) == 0
|
|
|
|
|
|
def test_free_text_search(app, admin, settings):
|
|
settings.LANGUAGE_CODE = 'fr' # use fr date format
|
|
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
Attribute.objects.create(kind='birthdate', name='birthdate', label='birthdate', required=False, searchable=True)
|
|
|
|
user = User.objects.create()
|
|
user.attributes.birthdate = datetime.date(1982, 2, 10)
|
|
|
|
resp = app.get('/api/users/?q=10/02/1982')
|
|
assert len(resp.json['results']) == 1
|
|
assert resp.json['results'][0]['id'] == user.id
|
|
|
|
|
|
def test_find_duplicates(app, admin, settings):
|
|
settings.LANGUAGE_CODE = 'fr'
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
|
|
first_name = 'Jean-Kévin'
|
|
last_name = 'Du Château'
|
|
user = User.objects.create(first_name=first_name, last_name=last_name)
|
|
|
|
exact_match = {
|
|
'first_name': first_name,
|
|
'last_name': last_name,
|
|
}
|
|
resp = app.get('/api/users/find_duplicates/', params=exact_match)
|
|
assert resp.json['data'][0]['id'] == user.id
|
|
assert resp.json['data'][0]['duplicate_distance'] == 0
|
|
assert resp.json['data'][0]['text'] == 'Jean-Kévin Du Château'
|
|
|
|
typo = {
|
|
'first_name': 'Jean Kévin',
|
|
'last_name': 'Du Châtau',
|
|
}
|
|
resp = app.get('/api/users/find_duplicates/', params=typo)
|
|
assert resp.json['data'][0]['id'] == user.id
|
|
assert resp.json['data'][0]['duplicate_distance'] > 0
|
|
|
|
typo = {
|
|
'first_name': 'Jean Kévin',
|
|
'last_name': 'Château',
|
|
}
|
|
resp = app.get('/api/users/find_duplicates/', params=typo)
|
|
assert resp.json['data'][0]['id'] == user.id
|
|
|
|
other_person = {
|
|
'first_name': 'Jean-Kévin',
|
|
'last_name': 'Du Chêne',
|
|
}
|
|
user = User.objects.create(first_name='Éléonore', last_name='âêîôû')
|
|
resp = app.get('/api/users/find_duplicates/', params=other_person)
|
|
assert len(resp.json['data']) == 0
|
|
|
|
other_person = {
|
|
'first_name': 'Pierre',
|
|
'last_name': 'Du Château',
|
|
}
|
|
resp = app.get('/api/users/find_duplicates/', params=other_person)
|
|
assert len(resp.json['data']) == 0
|
|
|
|
|
|
def test_find_duplicates_unaccent(app, admin, settings):
|
|
settings.LANGUAGE_CODE = 'fr'
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
|
|
user = User.objects.create(first_name='Éléonore', last_name='âêîôû')
|
|
|
|
resp = app.get('/api/users/find_duplicates/', params={'first_name': 'Eleonore', 'last_name': 'aeiou'})
|
|
assert resp.json['data'][0]['id'] == user.id
|
|
|
|
|
|
def test_find_duplicates_birthdate(app, admin, settings):
|
|
settings.LANGUAGE_CODE = 'fr'
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
|
|
Attribute.objects.create(kind='birthdate', name='birthdate', label='birthdate', required=False, searchable=True)
|
|
|
|
user = User.objects.create(first_name='Jean', last_name='Dupont')
|
|
homonym = User.objects.create(first_name='Jean', last_name='Dupont')
|
|
user.attributes.birthdate = datetime.date(1980, 1, 2)
|
|
homonym.attributes.birthdate = datetime.date(1980, 1, 3)
|
|
|
|
params = {
|
|
'first_name': 'Jeanne',
|
|
'last_name': 'Dupont',
|
|
}
|
|
resp = app.get('/api/users/find_duplicates/', params=params)
|
|
assert len(resp.json['data']) == 2
|
|
|
|
params['birthdate'] = '1980-01-2',
|
|
resp = app.get('/api/users/find_duplicates/', params=params)
|
|
assert len(resp.json['data']) == 2
|
|
assert resp.json['data'][0]['id'] == user.pk
|
|
|
|
params['birthdate'] = '1980-01-3',
|
|
resp = app.get('/api/users/find_duplicates/', params=params)
|
|
assert len(resp.json['data']) == 2
|
|
assert resp.json['data'][0]['id'] == homonym.pk
|
|
|
|
|
|
class MockedRequestResponse(mock.Mock):
|
|
status_code = 200
|
|
|
|
def json(self):
|
|
return json.loads(self.content)
|
|
|
|
|
|
def test_api_address_autocomplete(app, admin, settings):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
|
|
settings.ADDRESS_AUTOCOMPLETE_URL = 'example.com'
|
|
|
|
params = {'q': '42 avenue'}
|
|
with mock.patch('authentic2.api_views.requests.get') as requests_get:
|
|
mock_resp = Response()
|
|
mock_resp.status_code = 500
|
|
requests_get.return_value = mock_resp
|
|
resp = app.get('/api/address-autocomplete/', params=params)
|
|
assert resp.json == {}
|
|
assert requests_get.call_args_list[0][0][0] == 'example.com'
|
|
assert requests_get.call_args_list[0][1]['params'] == {'q': ['42 avenue']}
|
|
with mock.patch('authentic2.api_views.requests.get') as requests_get:
|
|
mock_resp = Response()
|
|
mock_resp.status_code = 404
|
|
requests_get.return_value = mock_resp
|
|
resp = app.get('/api/address-autocomplete/', params=params)
|
|
assert resp.json == {}
|
|
with mock.patch('authentic2.api_views.requests.get') as requests_get:
|
|
requests_get.return_value = MockedRequestResponse(content=json.dumps({'data': {'foo': 'bar'}}))
|
|
resp = app.get('/api/address-autocomplete/', params=params)
|
|
assert resp.json == {'data': {'foo': 'bar'}}
|
|
|
|
settings.ADDRESS_AUTOCOMPLETE_URL = None
|
|
with mock.patch('authentic2.api_views.requests.get') as requests_get:
|
|
resp = app.get('/api/address-autocomplete/', params=params)
|
|
assert resp.json == {}
|
|
assert requests_get.call_args_list == []
|
|
|
|
del settings.ADDRESS_AUTOCOMPLETE_URL
|
|
with mock.patch('authentic2.api_views.requests.get') as requests_get:
|
|
resp = app.get('/api/address-autocomplete/', params=params)
|
|
assert resp.json == {}
|
|
assert requests_get.call_args_list == []
|
|
|
|
|
|
def test_phone_normalization_ok(settings, app, admin):
|
|
headers = basic_authorization_header(admin)
|
|
Attribute.objects.create(name='phone', label='phone', kind='phone_number')
|
|
payload = {
|
|
'username': 'janedoe',
|
|
'phone': ' + 334-99 98.56/43',
|
|
'first_name': 'Jane',
|
|
'last_name': 'Doe',
|
|
}
|
|
resp = app.post_json('/api/users/', headers=headers, params=payload, status=201)
|
|
assert resp.json['phone'] == '+33499985643'
|
|
assert User.objects.get(username='janedoe').attributes.phone == '+33499985643'
|
|
|
|
|
|
def test_phone_normalization_nok(settings, app, admin):
|
|
headers = basic_authorization_header(admin)
|
|
Attribute.objects.create(name='phone', label='phone', kind='phone_number')
|
|
payload = {
|
|
'username': 'janedoe',
|
|
'first_name': 'Jane',
|
|
'last_name': 'Doe',
|
|
}
|
|
payload['phone'] = ' + 334-99+98.56/43',
|
|
app.post_json('/api/users/', headers=headers, params=payload, status=400)
|
|
|
|
payload['phone'] = '1#2'
|
|
app.post_json('/api/users/', headers=headers, params=payload, status=400)
|
|
|
|
|
|
def test_api_users_mark_as_deleted(app, settings, admin):
|
|
email='foo@example.net'
|
|
user1 = User.objects.create(username='foo', email=email)
|
|
user1.mark_as_deleted()
|
|
user2 = User.objects.create(username='foo2', email=email)
|
|
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
resp = app.get('/api/users/?email={}'.format(email))
|
|
assert len(resp.json['results']) == 1
|
|
|
|
payload = {
|
|
'username': 'foo3',
|
|
'email': email,
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
}
|
|
headers = basic_authorization_header(admin)
|
|
app.post_json('/api/users/', headers=headers, params=payload, status=201)
|
|
|
|
resp = app.get('/api/users/?email={}'.format(email))
|
|
assert len(resp.json['results']) == 2
|
|
|
|
user2.mark_as_deleted()
|
|
resp = app.get('/api/users/?email={}'.format(email))
|
|
assert len(resp.json['results']) == 1
|
|
|
|
settings.A2_EMAIL_IS_UNIQUE = True
|
|
# email already used
|
|
resp = app.post_json('/api/users/', headers=headers, params=payload, status=400)
|
|
assert resp.json['errors'] == {'email': ['email already used']}
|
|
|
|
|
|
def test_api_register_mark_as_deleted(app, settings, admin):
|
|
settings.A2_EMAIL_IS_UNIQUE = True
|
|
|
|
user = User.objects.create(username='foo', email='john.doe@example.com', ou=get_default_ou())
|
|
|
|
headers = basic_authorization_header(admin)
|
|
payload = {
|
|
'username': 'john.doe',
|
|
'email': 'john.doe@example.com',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'password': '12XYab',
|
|
'no_email_validation': True,
|
|
'return_url': 'http://sp.example.com/validate/',
|
|
'ou': 'default',
|
|
}
|
|
response = app.post_json(
|
|
reverse('a2-api-register'), params=payload, headers=headers, status=400)
|
|
assert response.json['errors'] == {'__all__': ['Account already exists']}
|
|
|
|
user.mark_as_deleted()
|
|
response = app.post_json(
|
|
reverse('a2-api-register'), params=payload, headers=headers, status=201)
|
|
|
|
|
|
def test_api_password_change_mark_as_deleted(app, settings, admin, ou1):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
user1 = User.objects.create(
|
|
username='john.doe', email='john.doe@example.com', ou=ou1)
|
|
user1.set_password('password')
|
|
user1.save()
|
|
user2 = User.objects.create(
|
|
username='john.doe2', email='john.doe@example.com', ou=ou1)
|
|
user2.set_password('password')
|
|
user1.save()
|
|
|
|
payload = {
|
|
'email': 'john.doe@example.com',
|
|
'ou': ou1.slug,
|
|
'old_password': 'password',
|
|
'new_password': 'password2',
|
|
}
|
|
url = reverse('a2-api-password-change')
|
|
response = app.post_json(url, params=payload, status=400)
|
|
user2.mark_as_deleted()
|
|
response = app.post_json(url, params=payload)
|
|
assert User.objects.get(username='john.doe').check_password('password2')
|
|
|
|
|
|
@pytest.mark.skipif(drf_version.startswith('3.4'), reason='no support for old django rest framework')
|
|
def test_api_statistics_list(app, admin):
|
|
headers = basic_authorization_header(admin)
|
|
resp = app.get('/api/statistics/', headers=headers)
|
|
assert len(resp.json['data']) == 6
|
|
login_stats = {
|
|
'name': 'Login count by authentication type',
|
|
'url': 'http://testserver/api/statistics/login/',
|
|
'id': 'login',
|
|
'filters': [
|
|
{
|
|
"id": "time_interval",
|
|
"label": "Time interval",
|
|
"options": [
|
|
{"id": "day", "label": "Day"},
|
|
{"id": "month", "label": "Month"},
|
|
{"id": "year", "label": "Year"},
|
|
],
|
|
"required": True,
|
|
"default": "month",
|
|
},
|
|
{
|
|
'id': 'ou',
|
|
'label': 'Organizational Unit',
|
|
'options': [{'id': 'default', 'label': 'Default organizational unit'}],
|
|
},
|
|
{'id': 'service', 'label': 'Service', 'options': []},
|
|
],
|
|
}
|
|
assert login_stats in resp.json['data']
|
|
assert {
|
|
'name': 'Login count by service',
|
|
'url': 'http://testserver/api/statistics/service_login/',
|
|
'id': 'service-login',
|
|
'filters': [
|
|
{
|
|
"id": "time_interval",
|
|
"label": "Time interval",
|
|
"options": [
|
|
{"id": "day", "label": "Day"},
|
|
{"id": "month", "label": "Month"},
|
|
{"id": "year", "label": "Year"},
|
|
],
|
|
"required": True,
|
|
"default": "month",
|
|
},
|
|
],
|
|
} in resp.json['data']
|
|
|
|
service = Service.objects.create(name='Service1', slug='service1', ou=get_default_ou())
|
|
service = Service.objects.create(name='Service2', slug='service2', ou=get_default_ou())
|
|
login_stats['filters'][2]['options'].append({'id': 'service1 default', 'label': 'Service1'})
|
|
login_stats['filters'][2]['options'].append({'id': 'service2 default', 'label': 'Service2'})
|
|
|
|
resp = app.get('/api/statistics/', headers=headers)
|
|
assert login_stats in resp.json['data']
|
|
|
|
|
|
@pytest.mark.skipif(drf_version.startswith('3.4'), reason='no support for old django rest framework')
|
|
@pytest.mark.parametrize(
|
|
'event_type_name,event_name', [('user.login', 'login'), ('user.registration', 'registration')]
|
|
)
|
|
def test_api_statistics(app, admin, freezer, event_type_name, event_name):
|
|
headers = basic_authorization_header(admin)
|
|
|
|
resp = app.get('/api/statistics/login/?time_interval=month', headers=headers)
|
|
assert resp.json == {"data": {"series": [], "x_labels": []}, "err": 0}
|
|
|
|
user = User.objects.create(username='john.doe', email='john.doe@example.com')
|
|
portal = Service.objects.create(name='portal', slug='portal', ou=get_default_ou())
|
|
agendas = Service.objects.create(name='agendas', slug='agendas', ou=get_default_ou())
|
|
|
|
method = {'how': 'password-on-https'}
|
|
method2 = {'how': 'fc'}
|
|
|
|
event_type = EventType.objects.get_for_name(event_type_name)
|
|
event_type_definition = event_type.definition
|
|
|
|
freezer.move_to('2020-02-03 12:00')
|
|
event = Event.objects.create(type=event_type, references=[portal], data=method)
|
|
event = Event.objects.create(type=event_type, references=[agendas], data=method)
|
|
|
|
freezer.move_to('2020-03-04 13:00')
|
|
event = Event.objects.create(type=event_type, references=[agendas], data=method)
|
|
event = Event.objects.create(type=event_type, references=[portal], data=method2)
|
|
|
|
resp = app.get('/api/statistics/%s/?time_interval=month' % event_name, headers=headers)
|
|
data = resp.json['data']
|
|
data['series'].sort(key=lambda x: x['label'])
|
|
assert data == {
|
|
'x_labels': ['2020-02', '2020-03'],
|
|
'series': [{'label': 'FranceConnect', 'data': [None, 1]}, {'label': 'password', 'data': [2, 1]}],
|
|
}
|
|
|
|
# default time interval is 'month'
|
|
month_data = data
|
|
resp = app.get('/api/statistics/%s/' % event_name, headers=headers)
|
|
data = resp.json['data']
|
|
data['series'].sort(key=lambda x: x['label'])
|
|
assert month_data == data
|
|
|
|
resp = app.get('/api/statistics/%s/?time_interval=month&ou=default' % event_name, headers=headers)
|
|
data = resp.json['data']
|
|
data['series'].sort(key=lambda x: x['label'])
|
|
assert data == {
|
|
'x_labels': ['2020-02', '2020-03'],
|
|
'series': [{'label': 'FranceConnect', 'data': [None, 1]}, {'label': 'password', 'data': [2, 1]}],
|
|
}
|
|
|
|
resp = app.get(
|
|
'/api/statistics/%s/?time_interval=month&service=agendas default' % event_name, headers=headers
|
|
)
|
|
data = resp.json['data']
|
|
assert data == {'x_labels': ['2020-02', '2020-03'], 'series': [{'label': 'password', 'data': [1, 1]}]}
|
|
|
|
resp = app.get(
|
|
'/api/statistics/%s/?time_interval=month&start=2020-03-01T01:01' % event_name, headers=headers
|
|
)
|
|
data = resp.json['data']
|
|
data['series'].sort(key=lambda x: x['label'])
|
|
assert data == {
|
|
'x_labels': ['2020-03'],
|
|
'series': [{'label': 'FranceConnect', 'data': [1]}, {'label': 'password', 'data': [1]}],
|
|
}
|
|
|
|
resp = app.get(
|
|
'/api/statistics/%s/?time_interval=month&end=2020-03-01T01:01' % event_name, headers=headers
|
|
)
|
|
data = resp.json['data']
|
|
assert data == {'x_labels': ['2020-02'], 'series': [{'label': 'password', 'data': [2]}]}
|
|
|
|
resp = app.get(
|
|
'/api/statistics/%s/?time_interval=month&end=2020-03-01' % event_name, headers=headers
|
|
)
|
|
data = resp.json['data']
|
|
assert data == {'x_labels': ['2020-02'], 'series': [{'label': 'password', 'data': [2]}]}
|
|
|
|
resp = app.get(
|
|
'/api/statistics/%s/?time_interval=year&service=portal default' % event_name, headers=headers
|
|
)
|
|
data = resp.json['data']
|
|
data['series'].sort(key=lambda x: x['label'])
|
|
assert data == {
|
|
'x_labels': ['2020'],
|
|
'series': [{'label': 'FranceConnect', 'data': [1]}, {'label': 'password', 'data': [1]}],
|
|
}
|
|
|
|
resp = app.get('/api/statistics/service_%s/?time_interval=month' % event_name, headers=headers)
|
|
data = resp.json['data']
|
|
data['series'].sort(key=lambda x: x['label'])
|
|
assert data == {
|
|
'x_labels': ['2020-02', '2020-03'],
|
|
'series': [{'label': 'agendas', 'data': [1, 1]}, {'label': 'portal', 'data': [1, 1]}],
|
|
}
|
|
|
|
resp = app.get('/api/statistics/service_ou_%s/?time_interval=month' % event_name, headers=headers)
|
|
data = resp.json['data']
|
|
assert data == {
|
|
'x_labels': ['2020-02', '2020-03'],
|
|
'series': [{'label': 'Default organizational unit', 'data': [2, 2]}],
|
|
}
|
|
|
|
|
|
def test_api_statistics_no_crash_older_drf(app, admin):
|
|
headers = basic_authorization_header(admin)
|
|
expected_status = 200 if drf_version > '3.9' else 404
|
|
resp = app.get('/api/statistics/login/?time_interval=month', headers=headers, status=expected_status)
|
|
|
|
|
|
def test_find_duplicates_put(app, admin, settings):
|
|
app.authorization = ('Basic', (admin.username, admin.username))
|
|
app.put_json('/api/users/find_duplicates/', params={'first_name': 'Eleonore', 'last_name': 'aeiou'}, status=405)
|