tests: split user synchronization API tests (#67901)

This commit is contained in:
Benjamin Dauvergne 2022-10-07 10:53:22 +02:00
parent 0cb14c0138
commit 5a592baf31
2 changed files with 171 additions and 148 deletions

View File

@ -17,22 +17,18 @@
import datetime
import json
import random
import uuid
from unittest import mock
import django
import pytest
from django.contrib.auth import get_user_model
from django.contrib.auth.hashers import check_password
from django.contrib.contenttypes.models import ContentType
from django.core import mail
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.text import slugify
from requests.models import Response
from authentic2.a2_rbac.models import SEARCH_OP
from authentic2.a2_rbac.models import OrganizationalUnit as OU
from authentic2.a2_rbac.models import Role
from authentic2.a2_rbac.utils import get_default_ou
@ -1128,150 +1124,6 @@ def test_register_ou_no_email_validation(settings, app, admin, django_user_model
assert user.check_password(password)
def test_user_synchronization(app, simple_user):
headers = basic_authorization_header(simple_user)
uuids = []
for _ in range(100):
user = User.objects.create(first_name='ben', last_name='dauve')
uuids.append(user.uuid)
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
url = reverse('a2-api-users-synchronization')
content = {
'known_uuids': uuids + unknown_uuids,
}
random.shuffle(content['known_uuids'])
response = app.post_json(url, params=content, headers=headers, status=403)
# give custom_user.search_user permission to user
r = Role.objects.get_admin_role(
ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
)
r.members.add(simple_user)
response = app.post_json(url, params=content, headers=headers)
assert response.json['result'] == 1
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
def test_user_synchronization_full(app, admin):
headers = basic_authorization_header(admin)
uuids = []
for _ in range(100):
user = User.objects.create(first_name='jim', last_name='jam')
uuids.append(user.uuid)
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
url = reverse('a2-api-users-synchronization')
content = {
'known_uuids': uuids + unknown_uuids,
'full_known_users': 1,
}
random.shuffle(content['known_uuids'])
response = app.post_json(url, params=content, headers=headers)
assert response.json['result'] == 1
# known users returned as part of api's full mode:
assert len(response.json['known_users']) == 100
for user_dict in response.json['known_users']:
assert user_dict['first_name'] == 'jim'
assert user_dict['last_name'] == 'jam'
assert {
'uuid',
'email',
'is_staff',
'is_superuser',
'email_verified',
'ou',
'is_active',
'deactivation',
'modified',
}.issubset(set(user_dict.keys()))
def test_user_synchronization_timestamp(app, admin):
headers = basic_authorization_header(admin)
url = reverse('a2-api-users-synchronization')
now = datetime.datetime.now()
ou = get_default_ou()
User = get_user_model()
users = []
for i in range(6):
users.append(
User.objects.create(
first_name='john%s' % i,
last_name='doe',
username='user%s' % i,
email='user%s' % i,
ou=ou,
)
)
for i, event_name in enumerate(
[
'manager.user.creation',
'manager.user.profile.edit',
'manager.user.activation',
'manager.user.deactivation',
'manager.user.password.change.force',
'manager.user.password.change.unforce',
]
):
event_type = EventType.objects.get_for_name(event_name)
Event.objects.create(
type=event_type,
timestamp=now - datetime.timedelta(days=i, hours=1),
references=[users[i]],
)
content = {
'known_uuids': [user.uuid for user in users],
'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
}
response = app.post(url, params=content, headers=headers)
for user in users[:3]:
assert user.uuid in response.json['modified_users_uuids']
assert user.uuid not in response.json['unknown_uuids']
for user in users[3:]:
assert user.uuid not in response.json['modified_users_uuids']
assert user.uuid not in response.json['unknown_uuids']
for user in users[:3]:
user.delete()
content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
response = app.post(url, params=content, headers=headers)
for user in users[:3]:
assert user.uuid not in response.json['modified_users_uuids']
assert user.uuid in response.json['unknown_uuids']
for user in users[3:]:
assert user.uuid in response.json['modified_users_uuids']
assert user.uuid not in response.json['unknown_uuids']
for user in users[3:]:
user.delete()
response = app.post(url, params=content, headers=headers)
assert not response.json['modified_users_uuids']
for user in users:
assert user.uuid in response.json['unknown_uuids']
for user in users[:3]:
content['known_uuids'].remove(user.uuid)
response = app.post(url, params=content, headers=headers)
assert not response.json['modified_users_uuids']
assert len(response.json['unknown_uuids']) == 3
for user in users[3:]:
assert user.uuid in response.json['unknown_uuids']
def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client):
url = '/api/users/%s/' % user_ou1.uuid
# test invalid client

View File

@ -0,0 +1,171 @@
# authentic2 - versatile identity manager
# Copyright (C) 2010-2022 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import random
import uuid
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from authentic2.a2_rbac.models import SEARCH_OP, Role
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.apps.journal.models import Event, EventType
from authentic2.custom_user.models import User
from ..utils import basic_authorization_header
def test_basic(app, simple_user):
headers = basic_authorization_header(simple_user)
uuids = []
for _ in range(100):
user = User.objects.create(first_name='ben', last_name='dauve')
uuids.append(user.uuid)
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
url = reverse('a2-api-users-synchronization')
content = {
'known_uuids': uuids + unknown_uuids,
}
random.shuffle(content['known_uuids'])
# test permission check
response = app.post_json(url, params=content, headers=headers, status=403)
r = Role.objects.get_admin_role(
ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
)
r.members.add(simple_user)
response = app.post_json(url, params=content, headers=headers)
assert response.json['result'] == 1
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
def test_full_known_users(app, admin):
headers = basic_authorization_header(admin)
uuids = []
for _ in range(100):
user = User.objects.create(first_name='jim', last_name='jam')
uuids.append(user.uuid)
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
url = reverse('a2-api-users-synchronization')
content = {
'known_uuids': uuids + unknown_uuids,
'full_known_users': 1,
}
random.shuffle(content['known_uuids'])
response = app.post_json(url, params=content, headers=headers)
assert response.json['result'] == 1
# known users returned as part of api's full mode:
assert len(response.json['known_users']) == 100
for user_dict in response.json['known_users']:
assert user_dict['first_name'] == 'jim'
assert user_dict['last_name'] == 'jam'
assert {
'uuid',
'email',
'is_staff',
'is_superuser',
'email_verified',
'ou',
'is_active',
'deactivation',
'modified',
}.issubset(set(user_dict.keys()))
def test_timestamp(app, admin):
headers = basic_authorization_header(admin)
url = reverse('a2-api-users-synchronization')
now = datetime.datetime.now()
ou = get_default_ou()
users = []
for i in range(6):
users.append(
User.objects.create(
first_name='john%s' % i,
last_name='doe',
username='user%s' % i,
email='user%s' % i,
ou=ou,
)
)
for i, event_name in enumerate(
[
'manager.user.creation',
'manager.user.profile.edit',
'manager.user.activation',
'manager.user.deactivation',
'manager.user.password.change.force',
'manager.user.password.change.unforce',
]
):
event_type = EventType.objects.get_for_name(event_name)
Event.objects.create(
type=event_type,
timestamp=now - datetime.timedelta(days=i, hours=1),
references=[users[i]],
)
content = {
'known_uuids': [user.uuid for user in users],
'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
}
response = app.post(url, params=content, headers=headers)
for user in users[:3]:
assert user.uuid in response.json['modified_users_uuids']
assert user.uuid not in response.json['unknown_uuids']
for user in users[3:]:
assert user.uuid not in response.json['modified_users_uuids']
assert user.uuid not in response.json['unknown_uuids']
for user in users[:3]:
user.delete()
content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
response = app.post(url, params=content, headers=headers)
for user in users[:3]:
assert user.uuid not in response.json['modified_users_uuids']
assert user.uuid in response.json['unknown_uuids']
for user in users[3:]:
assert user.uuid in response.json['modified_users_uuids']
assert user.uuid not in response.json['unknown_uuids']
for user in users[3:]:
user.delete()
response = app.post(url, params=content, headers=headers)
assert not response.json['modified_users_uuids']
for user in users:
assert user.uuid in response.json['unknown_uuids']
for user in users[:3]:
content['known_uuids'].remove(user.uuid)
response = app.post(url, params=content, headers=headers)
assert not response.json['modified_users_uuids']
assert len(response.json['unknown_uuids']) == 3
for user in users[3:]:
assert user.uuid in response.json['unknown_uuids']