authentic/tests/test_all.py

704 lines
29 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import json
import urllib.parse
import pytest
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.contrib.sessions.backends.cache import SessionStore
from django.core import mail
from django.core.serializers.json import DjangoJSONEncoder
from django.test import TestCase
from django.test.client import Client
from django.test.utils import override_settings
from django.urls import reverse
from django.utils.encoding import force_text
from django.utils.translation import ugettext as _
from rest_framework import status, test
from authentic2 import attribute_kinds, models
from authentic2.utils import misc as utils_misc
from authentic2.utils.misc import continue_to_next_url, login_require, make_url, redirect, redirect_to_login
from django_rbac.utils import get_ou_model, get_role_model
from .utils import Authentic2TestCase, assert_event, get_link_from_mail, get_response_form
class SerializerTests(TestCase):
def test_generic_foreign_key_natural_key(self):
import json
from django.core import serializers
from authentic2.models import Attribute, AttributeValue
User = get_user_model()
ucount = User.objects.count()
acount = Attribute.objects.count()
u = User.objects.create(username='john.doe')
avcount = AttributeValue.objects.count()
a = Attribute.objects.create(name='phone', label='phone', kind='string')
av = AttributeValue.objects.create(owner=u, attribute=a, content='0101010101')
self.assertEqual(User.objects.count(), ucount + 1)
self.assertEqual(Attribute.objects.count(), acount + 1)
self.assertEqual(AttributeValue.objects.count(), avcount + 1)
s = serializers.get_serializer('json')()
s.serialize([u, a, av], use_natural_foreign_keys=True, use_natural_primary_keys=True)
result = s.getvalue()
u.delete()
a.delete()
self.assertEqual(User.objects.count(), ucount)
self.assertEqual(Attribute.objects.count(), acount)
self.assertEqual(AttributeValue.objects.count(), 0)
expected = [
{
'model': 'custom_user.user',
'fields': {
'uuid': u.uuid,
'email_verified': False,
'username': 'john.doe',
'email': '',
'first_name': '',
'last_name': '',
'is_active': True,
'is_staff': False,
'is_superuser': False,
'last_login': u.last_login,
'last_account_deletion_alert': None,
'date_joined': u.date_joined,
'modified': u.modified,
'groups': [],
'user_permissions': [],
'password': '',
'ou': None,
'deactivation': None,
'deactivation_reason': None,
},
},
{
'model': 'authentic2.attribute',
'fields': {
'description': '',
'name': 'phone',
'label': 'phone',
'kind': 'string',
'user_editable': False,
'asked_on_registration': False,
'multiple': False,
'user_visible': False,
'required': False,
'disabled': False,
'searchable': False,
'order': 0,
'scopes': '',
'required_on_login': False,
},
},
{
'model': 'authentic2.attributevalue',
'fields': {
'owner': [['custom_user', 'user'], [u.uuid]],
'attribute': ['phone'],
'content': '0101010101',
'multiple': False,
'verified': False,
'search_vector': None,
},
},
]
expected = json.loads(json.dumps(expected, cls=DjangoJSONEncoder))
for obj in serializers.deserialize('json', result):
obj.save()
self.assertEqual(json.loads(result), expected)
self.assertEqual(User.objects.count(), ucount + 1)
self.assertEqual(Attribute.objects.count(), acount + 1)
# first_name and last_name attribute value not recreated since they were not dumped
self.assertEqual(AttributeValue.objects.count(), 1)
class UtilsTests(Authentic2TestCase):
def test_assert_equals_url(self):
self.assertEqualsURL('/test?coin=1&bob=2&coin=3', '/test?bob=2&coin=1&coin=3')
def test_make_url(self):
self.assertEqualsURL(make_url('../coin'), '../coin')
self.assertEqualsURL(make_url('../boob', params={'next': '..'}), '../boob?next=..')
self.assertEqualsURL(
make_url('../boob', params={'next': '..'}, append={'xx': 'yy'}), '../boob?xx=yy&next=..'
)
self.assertEqualsURL(
make_url('../boob', params={'next': '..'}, append={'next': 'yy'}), '../boob?next=..&next=yy'
)
self.assertEqualsURL(make_url('auth_login', params={'next': '/zob'}), '/login/?next=%2Fzob')
self.assertEqualsURL(
make_url('auth_login', params={'next': '/zob'}, fragment='a2-panel'),
'/login/?next=%2Fzob#a2-panel',
)
def test_redirect(self):
from django.test.client import RequestFactory
rf = RequestFactory()
request = rf.get('/coin', data={'next': '..'})
request2 = rf.get('/coin', data={'next': '..', 'token': 'xxx'})
response = redirect(request, '/boob/', keep_params=True)
self.assertEqualsURL(response['Location'], '/boob/?next=..')
response = redirect(request, '/boob/', keep_params=True, exclude=['next'])
self.assertEqualsURL(response['Location'], '/boob/')
response = redirect(request2, '/boob/', keep_params=True)
self.assertEqualsURL(response['Location'], '/boob/?token=xxx&next=..')
response = redirect(request, '/boob/', keep_params=True, exclude=['token'])
self.assertEqualsURL(response['Location'], '/boob/?next=..')
response = redirect(request, '/boob/', keep_params=True, include=['next'])
self.assertEqualsURL(response['Location'], '/boob/?next=..')
response = redirect(request, '/boob/', keep_params=True, include=['next'], params={'token': 'uuu'})
self.assertEqualsURL(response['Location'], '/boob/?token=uuu&next=..')
def test_redirect_to_login(self):
from django.test.client import RequestFactory
rf = RequestFactory()
request = rf.get('/coin', data={'next': '..'})
response = redirect_to_login(request)
self.assertEqualsURL(response['Location'], '/login/?next=..')
def test_continue_to_next_url(self):
from django.test.client import RequestFactory
rf = RequestFactory()
request = rf.get('/coin', data={'next': '/zob/', 'nonce': 'xxx'})
response = continue_to_next_url(request)
self.assertEqualsURL(response['Location'], '/zob/?nonce=xxx')
def test_login_require(self):
from django.test.client import RequestFactory
rf = RequestFactory()
request = rf.get('/coin', data={'next': '/zob/', 'nonce': 'xxx'})
request.session = SessionStore()
response = login_require(request, login_hint=['backoffice'])
self.assertEqualsURL(response['Location'].split('?', 1)[0], '/login/')
self.assertEqualsURL(
urllib.parse.parse_qs(response['Location'].split('?', 1)[1])['next'][0],
'/coin?nonce=xxx&next=/zob/',
)
self.assertEqual(request.session['login-hint'], ['backoffice'])
class UserProfileTests(TestCase):
def setUp(self):
User = get_user_model()
user = User.objects.create(username='testbot')
user.set_password('secret')
user.save()
self.user = user
self.client = Client()
def test_edit_profile_attributes(self):
# disable existing attributes
models.Attribute.objects.update(disabled=True)
models.Attribute.objects.create(
label='custom',
name='custom',
required=True,
user_visible=True,
user_editable=True,
kind='string',
)
models.Attribute.objects.create(
label='ID', name='national_number', user_editable=True, user_visible=True, kind='string'
)
self.assertTrue(self.client.login(request=None, username='testbot', password='secret'))
# get the edit page in order to check form's prefix
response = self.client.get(reverse('profile_edit'))
form = get_response_form(response)
kwargs = {'custom': 'random data', 'national_number': 'xx20153566342yy'}
if form.prefix:
kwargs = {'%s-%s' % (form.prefix, k): v for k, v in kwargs.items()}
response = self.client.post(reverse('profile_edit'), kwargs)
new = {'custom': 'random data', 'next_url': '', 'national_number': 'xx20153566342yy'}
assert_event('user.profile.edit', user=self.user, session=self.client.session, old={}, new=new)
self.assertEqual(response.status_code, 302)
response = self.client.get(reverse('account_management'))
self.assertContains(response, 'random data')
self.assertContains(response, 'xx20153566342yy')
response = self.client.get(reverse('profile_edit'))
form = get_response_form(response)
self.assertEqual(form['custom'].value(), 'random data')
self.assertEqual(form['national_number'].value(), 'xx20153566342yy')
def test_noneditable_profile_attributes(self):
"""
tests if user non editable attributes do not appear in profile form
"""
# disable existing attributes
models.Attribute.objects.update(disabled=True)
models.Attribute.objects.create(
label='custom', name='custom', required=False, user_editable=False, kind='string'
)
models.Attribute.objects.create(
label='ID', name='national_number', user_editable=False, user_visible=False, kind='string'
)
self.assertTrue(self.client.login(request=None, username='testbot', password='secret'))
response = self.client.get(reverse('profile_edit'))
form = get_response_form(response)
self.assertEqual(set(form.fields), {'next_url'})
class CacheTests(TestCase):
@pytest.fixture(autouse=True)
def cache_settings(self, settings):
settings.A2_CACHE_ENABLED = True
@override_settings(ROOT_URLCONF='tests.cache_urls')
def test_cache_decorator_base(self):
import random
from authentic2.decorators import CacheDecoratorBase
class GlobalCache(CacheDecoratorBase):
def __init__(self, *args, **kwargs):
self.cache = {}
super().__init__(*args, **kwargs)
def set(self, key, value):
self.cache[key] = value
def get(self, key):
return self.cache.get(key, (None, None))
def delete(self, key, value):
if key in self.cache and self.cache[key] == value:
del self.cache[key]
def f():
return random.random()
def f2(a, b):
return a
# few chances the same value comme two times in a row
self.assertNotEqual(f(), f())
# with cache the same value will come back
g = GlobalCache(f, hostname_vary=False)
values = set()
for x in range(10):
values.add(g())
self.assertEqual(len(values), 1)
# with and hostname vary 10 values will come back
g = GlobalCache(f, hostname_vary=True)
values = set()
for x in range(10):
values.add(g())
self.assertEqual(len(values), 10)
# null timeout, no cache
h = GlobalCache(timeout=0)(f)
self.assertNotEqual(h(), h())
# vary on second arg
i = GlobalCache(hostname_vary=False, args=(1,))(f2)
for a in range(1, 10):
self.assertEqual(i(a, 1), 1)
for a in range(2, 10):
self.assertEqual(i(a, a), a)
@override_settings(ROOT_URLCONF='tests.cache_urls')
def test_django_cache(self):
response1 = self.client.get('/django_cache/', HTTP_HOST='cache1.example.com')
response2 = self.client.get('/django_cache/', HTTP_HOST='cache2.example.com')
response3 = self.client.get('/django_cache/', HTTP_HOST='cache1.example.com')
self.assertNotEqual(response1.content, response2.content)
self.assertEqual(response1.content, response3.content)
@override_settings(ROOT_URLCONF='tests.cache_urls')
def test_session_cache(self):
client = Client()
response1 = client.get('/session_cache/')
response2 = client.get('/session_cache/')
client = Client()
response3 = client.get('/session_cache/')
self.assertEqual(response1.content, response2.content)
self.assertNotEqual(response1.content, response3.content)
class AttributeKindsTest(TestCase):
def test_simple(self):
from django import forms
from django.core.exceptions import ValidationError
with self.settings(
A2_ATTRIBUTE_KINDS=[
{
'label': 'integer',
'name': 'integer',
'field_class': forms.IntegerField,
}
]
):
title_field = attribute_kinds.get_form_field('title')
self.assertTrue(isinstance(title_field, forms.ChoiceField))
self.assertTrue(isinstance(title_field.widget, forms.RadioSelect))
self.assertIsNotNone(title_field.choices)
self.assertTrue(isinstance(attribute_kinds.get_form_field('string'), forms.CharField))
self.assertEqual(attribute_kinds.get_kind('string')['name'], 'string')
self.assertTrue(isinstance(attribute_kinds.get_form_field('integer'), forms.IntegerField))
self.assertEqual(attribute_kinds.get_kind('integer')['name'], 'integer')
attribute_kinds.validate_siret('49108189900024')
with self.assertRaises(ValidationError):
attribute_kinds.validate_siret('49108189900044')
with self.assertRaises(KeyError):
attribute_kinds.get_form_field('integer')
with self.assertRaises(KeyError):
attribute_kinds.get_kind('integer')
fields = {}
for i, name in enumerate(attribute_kinds.get_attribute_kinds()):
fields['field_%d' % i] = attribute_kinds.get_form_field(name)
AttributeKindForm = type('AttributeKindForm', (forms.Form,), fields)
str(AttributeKindForm().as_p())
class APITest(TestCase):
def setUp(self):
User = get_user_model()
Role = get_role_model()
OU = get_ou_model()
ct_user = ContentType.objects.get_for_model(User)
self.ou = OU.objects.create(slug='ou', name='OU', email_is_unique=True, username_is_unique=True)
self.reguser1 = User.objects.create(username='reguser1')
self.reguser1.set_password('password')
self.reguser1.save()
cred = '%s:%s' % (self.reguser1.username, 'password')
cred = cred.encode('utf-8')
self.reguser1_cred = base64.b64encode(cred).decode('ascii')
self.user_admin_role = Role.objects.get_admin_role(
instance=ct_user, name='user admin', slug='user-admin'
)
self.reguser1.roles.add(self.user_admin_role)
self.reguser2 = User.objects.create(username='reguser2', password='password')
self.reguser2.set_password('password')
self.reguser2.save()
cred = '%s:%s' % (self.reguser2.username, 'password')
cred = cred.encode('utf-8')
self.reguser2_cred = base64.b64encode(cred).decode('ascii')
self.ou_user_admin_role = Role.objects.get_admin_role(
instance=ct_user, name='user admin', slug='user-admin', ou=self.ou
)
self.ou_user_admin_role.members.add(self.reguser2)
self.reguser3 = User.objects.create(username='reguser3', password='password', is_superuser=True)
self.reguser3.set_password('password')
self.reguser3.save()
cred = '%s:%s' % (self.reguser3.username, 'password')
cred = cred.encode('utf-8')
self.reguser3_cred = base64.b64encode(cred).decode('ascii')
def test_register_reguser1(self):
self.register_with_user(self.reguser1, self.reguser1_cred)
def test_register_reguser2(self):
self.register_with_user(self.reguser2, self.reguser2_cred)
def test_register_reguser3(self):
self.register_with_user(self.reguser3, self.reguser3_cred)
@override_settings(A2_REQUIRED_FIELDS=['username'])
def register_with_user(self, user, cred):
from django.contrib.auth import get_user_model
from rest_framework import status, test
# disable existing attributes
models.Attribute.objects.update(disabled=True)
User = get_user_model()
user_count = User.objects.count()
client = test.APIClient()
password = '12=XY=ab'
username = 'john.doe'
email = 'john.doe@example.com'
return_url = 'http://sp.org/register/'
payload = {
'email': email,
'username': username,
'ou': self.ou.slug,
'password': password,
'return_url': return_url,
}
outbox_level = len(mail.outbox)
client.credentials(HTTP_AUTHORIZATION='Basic %s' % cred)
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertIn('result', response.data)
self.assertEqual(response.data['result'], 1)
self.assertIn('token', response.data)
token = response.data['token']
self.assertEqual(len(mail.outbox), outbox_level + 1)
# User side
client = Client()
activation_url = get_link_from_mail(mail.outbox[-1])
response = client.get(activation_url, follow=True)
self.assertEqual(response.status_code, status.HTTP_200_OK)
assert utils_misc.make_url(return_url, params={'token': token}) in force_text(response.content)
self.assertEqual(User.objects.count(), user_count + 1)
response = client.get(reverse('auth_homepage'))
self.assertContains(response, username)
last_user = User.objects.order_by('id').last()
self.assertEqual(last_user.username, username)
self.assertEqual(last_user.email, email)
self.assertEqual(last_user.ou.slug, self.ou.slug)
self.assertTrue(last_user.check_password(password))
# Test email is unique with case change
client = test.APIClient()
client.credentials(HTTP_AUTHORIZATION='Basic %s' % cred)
payload = {
'email': email.upper(),
'username': username + '1',
'ou': self.ou.slug,
'password': password,
'return_url': return_url,
}
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.data['errors']['__all__'], [_('Account already exists in this ou')])
# Username is required
payload = {
'email': '1' + email,
'ou': self.ou.slug,
'password': password,
'return_url': return_url,
}
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.data['errors']['__all__'], [_('Username is required')])
# Test username is unique
payload = {
'email': '1' + email,
'username': username,
'ou': self.ou.slug,
'password': password,
'return_url': return_url,
}
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.data['errors']['__all__'], [_('Account already exists')])
def test_register_reguser2_wrong_ou(self):
client = test.APIClient()
password = '12=XY=ab'
username = 'john.doe'
email = 'john.doe@example.com'
return_url = 'http://sp.org/register/'
payload = {
'email': email,
'username': username,
'ou': 'default',
'password': password,
'return_url': return_url,
}
client.credentials(HTTP_AUTHORIZATION='Basic %s' % self.reguser2_cred)
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertIn('errors', response.data)
@override_settings(A2_REQUIRED_FIELDS=['username'])
def test_email_is_unique_double_registration(self):
from django.contrib.auth import get_user_model
from rest_framework import status, test
# disable existing attributes
models.Attribute.objects.update(disabled=True)
user = self.reguser3
cred = self.reguser3_cred
User = get_user_model()
user_count = User.objects.count()
client = test.APIClient()
password = '12=XY=ab'
username = 'john.doe'
email = 'john.doe@example.com'
return_url = 'http://sp.org/register/'
payload = {
'email': email,
'username': username,
'ou': self.ou.slug,
'password': password,
'return_url': return_url,
}
outbox_level = len(mail.outbox)
client.credentials(HTTP_AUTHORIZATION='Basic %s' % cred)
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertIn('result', response.data)
self.assertEqual(response.data['result'], 1)
self.assertIn('token', response.data)
token = response.data['token']
self.assertEqual(len(mail.outbox), outbox_level + 1)
outbox_level = len(mail.outbox)
# User side - user click on email
client = Client()
activation_url = get_link_from_mail(mail.outbox[0])
response = client.get(activation_url, follow=True)
self.assertEqual(response.status_code, status.HTTP_200_OK)
assert utils_misc.make_url(return_url, params={'token': token}) in force_text(response.content)
self.assertEqual(User.objects.count(), user_count + 1)
response = client.get(reverse('auth_homepage'))
self.assertContains(response, username)
last_user = User.objects.order_by('id').last()
self.assertEqual(last_user.username, username)
self.assertEqual(last_user.email, email)
self.assertEqual(last_user.ou.slug, self.ou.slug)
self.assertTrue(last_user.check_password(password))
# Test email is unique with case change
client = test.APIClient()
client.credentials(HTTP_AUTHORIZATION='Basic %s' % cred)
payload = {
'email': email.upper(),
'username': username + '1',
'ou': self.ou.slug,
'password': password,
'return_url': return_url,
}
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.data['errors']['__all__'], [_('Account already exists in this ou')])
# Username is required
payload = {
'email': '1' + email,
'ou': self.ou.slug,
'password': password,
'return_url': return_url,
}
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.data['errors']['__all__'], [_('Username is required')])
# Test username is unique
payload = {
'email': '1' + email,
'username': username,
'ou': self.ou.slug,
'password': password,
'return_url': return_url,
}
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.data['errors']['__all__'], [_('Account already exists')])
@override_settings(A2_REQUIRED_FIELDS=['username'])
def test_email_username_is_unique_double_registration(self):
from django.contrib.auth import get_user_model
from rest_framework import status, test
# disable existing attributes
models.Attribute.objects.update(disabled=True)
cred = self.reguser3_cred
User = get_user_model()
user_count = User.objects.count()
client = test.APIClient()
password = '12=XY=ab'
username = 'john.doe'
email = 'john.doe@example.com'
return_url = 'http://sp.org/register/'
payload = {
'email': email,
'username': username,
'ou': self.ou.slug,
'password': password,
'return_url': return_url,
}
outbox_level = len(mail.outbox)
client.credentials(HTTP_AUTHORIZATION='Basic %s' % cred)
response = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertIn('result', response.data)
self.assertEqual(response.data['result'], 1)
self.assertIn('token', response.data)
token = response.data['token']
self.assertEqual(len(mail.outbox), outbox_level + 1)
outbox_level = len(mail.outbox)
# Second registration
payload['email'] = 'john.doe2@example.com'
response2 = client.post(
reverse('a2-api-register'), content_type='application/json', data=json.dumps(payload)
)
self.assertEqual(response2.status_code, status.HTTP_202_ACCEPTED)
self.assertIn('result', response2.data)
self.assertEqual(response2.data['result'], 1)
self.assertIn('token', response2.data)
self.assertEqual(len(mail.outbox), outbox_level + 1)
activation_mail1, activation_mail2 = mail.outbox
# User side - user click on first email
client = Client()
activation_url = get_link_from_mail(activation_mail1)
response = client.get(activation_url, follow=True)
self.assertEqual(response.status_code, status.HTTP_200_OK)
assert utils_misc.make_url(return_url, params={'token': token}) in force_text(response.content)
self.assertEqual(User.objects.count(), user_count + 1)
response = client.get(reverse('auth_homepage'))
self.assertContains(response, username)
last_user = User.objects.order_by('id').last()
self.assertEqual(last_user.username, username)
self.assertEqual(last_user.email, email)
self.assertEqual(last_user.ou.slug, self.ou.slug)
self.assertTrue(last_user.check_password(password))
# User click on second email
client = Client()
activation_url = get_link_from_mail(activation_mail2)
response = client.get(activation_url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.status_code, 200)
self.assertFormError(
response,
'form',
'username',
_('This username is already in use. Please supply a different username.'),
)