tests: split user synchronization API tests (#67901)
This commit is contained in:
parent
0cb14c0138
commit
5a592baf31
|
@ -17,22 +17,18 @@
|
|||
|
||||
import datetime
|
||||
import json
|
||||
import random
|
||||
import uuid
|
||||
from unittest import mock
|
||||
|
||||
import django
|
||||
import pytest
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.hashers import check_password
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core import mail
|
||||
from django.urls import reverse
|
||||
from django.utils.encoding import force_str
|
||||
from django.utils.text import slugify
|
||||
from requests.models import Response
|
||||
|
||||
from authentic2.a2_rbac.models import SEARCH_OP
|
||||
from authentic2.a2_rbac.models import OrganizationalUnit as OU
|
||||
from authentic2.a2_rbac.models import Role
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
|
@ -1128,150 +1124,6 @@ def test_register_ou_no_email_validation(settings, app, admin, django_user_model
|
|||
assert user.check_password(password)
|
||||
|
||||
|
||||
def test_user_synchronization(app, simple_user):
|
||||
headers = basic_authorization_header(simple_user)
|
||||
uuids = []
|
||||
for _ in range(100):
|
||||
user = User.objects.create(first_name='ben', last_name='dauve')
|
||||
uuids.append(user.uuid)
|
||||
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
|
||||
url = reverse('a2-api-users-synchronization')
|
||||
content = {
|
||||
'known_uuids': uuids + unknown_uuids,
|
||||
}
|
||||
random.shuffle(content['known_uuids'])
|
||||
response = app.post_json(url, params=content, headers=headers, status=403)
|
||||
|
||||
# give custom_user.search_user permission to user
|
||||
r = Role.objects.get_admin_role(
|
||||
ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
|
||||
)
|
||||
|
||||
r.members.add(simple_user)
|
||||
response = app.post_json(url, params=content, headers=headers)
|
||||
assert response.json['result'] == 1
|
||||
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
|
||||
|
||||
|
||||
def test_user_synchronization_full(app, admin):
|
||||
headers = basic_authorization_header(admin)
|
||||
uuids = []
|
||||
for _ in range(100):
|
||||
user = User.objects.create(first_name='jim', last_name='jam')
|
||||
uuids.append(user.uuid)
|
||||
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
|
||||
url = reverse('a2-api-users-synchronization')
|
||||
content = {
|
||||
'known_uuids': uuids + unknown_uuids,
|
||||
'full_known_users': 1,
|
||||
}
|
||||
random.shuffle(content['known_uuids'])
|
||||
response = app.post_json(url, params=content, headers=headers)
|
||||
assert response.json['result'] == 1
|
||||
|
||||
# known users returned as part of api's full mode:
|
||||
assert len(response.json['known_users']) == 100
|
||||
for user_dict in response.json['known_users']:
|
||||
assert user_dict['first_name'] == 'jim'
|
||||
assert user_dict['last_name'] == 'jam'
|
||||
assert {
|
||||
'uuid',
|
||||
'email',
|
||||
'is_staff',
|
||||
'is_superuser',
|
||||
'email_verified',
|
||||
'ou',
|
||||
'is_active',
|
||||
'deactivation',
|
||||
'modified',
|
||||
}.issubset(set(user_dict.keys()))
|
||||
|
||||
|
||||
def test_user_synchronization_timestamp(app, admin):
|
||||
headers = basic_authorization_header(admin)
|
||||
url = reverse('a2-api-users-synchronization')
|
||||
now = datetime.datetime.now()
|
||||
|
||||
ou = get_default_ou()
|
||||
User = get_user_model()
|
||||
users = []
|
||||
|
||||
for i in range(6):
|
||||
users.append(
|
||||
User.objects.create(
|
||||
first_name='john%s' % i,
|
||||
last_name='doe',
|
||||
username='user%s' % i,
|
||||
email='user%s' % i,
|
||||
ou=ou,
|
||||
)
|
||||
)
|
||||
|
||||
for i, event_name in enumerate(
|
||||
[
|
||||
'manager.user.creation',
|
||||
'manager.user.profile.edit',
|
||||
'manager.user.activation',
|
||||
'manager.user.deactivation',
|
||||
'manager.user.password.change.force',
|
||||
'manager.user.password.change.unforce',
|
||||
]
|
||||
):
|
||||
event_type = EventType.objects.get_for_name(event_name)
|
||||
Event.objects.create(
|
||||
type=event_type,
|
||||
timestamp=now - datetime.timedelta(days=i, hours=1),
|
||||
references=[users[i]],
|
||||
)
|
||||
|
||||
content = {
|
||||
'known_uuids': [user.uuid for user in users],
|
||||
'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
|
||||
}
|
||||
|
||||
response = app.post(url, params=content, headers=headers)
|
||||
|
||||
for user in users[:3]:
|
||||
assert user.uuid in response.json['modified_users_uuids']
|
||||
assert user.uuid not in response.json['unknown_uuids']
|
||||
for user in users[3:]:
|
||||
assert user.uuid not in response.json['modified_users_uuids']
|
||||
assert user.uuid not in response.json['unknown_uuids']
|
||||
|
||||
for user in users[:3]:
|
||||
user.delete()
|
||||
|
||||
content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
|
||||
|
||||
response = app.post(url, params=content, headers=headers)
|
||||
|
||||
for user in users[:3]:
|
||||
assert user.uuid not in response.json['modified_users_uuids']
|
||||
assert user.uuid in response.json['unknown_uuids']
|
||||
for user in users[3:]:
|
||||
assert user.uuid in response.json['modified_users_uuids']
|
||||
assert user.uuid not in response.json['unknown_uuids']
|
||||
|
||||
for user in users[3:]:
|
||||
user.delete()
|
||||
|
||||
response = app.post(url, params=content, headers=headers)
|
||||
|
||||
assert not response.json['modified_users_uuids']
|
||||
for user in users:
|
||||
assert user.uuid in response.json['unknown_uuids']
|
||||
|
||||
for user in users[:3]:
|
||||
content['known_uuids'].remove(user.uuid)
|
||||
|
||||
response = app.post(url, params=content, headers=headers)
|
||||
|
||||
assert not response.json['modified_users_uuids']
|
||||
assert len(response.json['unknown_uuids']) == 3
|
||||
for user in users[3:]:
|
||||
assert user.uuid in response.json['unknown_uuids']
|
||||
|
||||
|
||||
def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client):
|
||||
url = '/api/users/%s/' % user_ou1.uuid
|
||||
# test invalid client
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
# authentic2 - versatile identity manager
|
||||
# Copyright (C) 2010-2022 Entr'ouvert
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU Affero General Public License as published
|
||||
# by the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import datetime
|
||||
import random
|
||||
import uuid
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.urls import reverse
|
||||
|
||||
from authentic2.a2_rbac.models import SEARCH_OP, Role
|
||||
from authentic2.a2_rbac.utils import get_default_ou
|
||||
from authentic2.apps.journal.models import Event, EventType
|
||||
from authentic2.custom_user.models import User
|
||||
|
||||
from ..utils import basic_authorization_header
|
||||
|
||||
|
||||
def test_basic(app, simple_user):
|
||||
headers = basic_authorization_header(simple_user)
|
||||
uuids = []
|
||||
for _ in range(100):
|
||||
user = User.objects.create(first_name='ben', last_name='dauve')
|
||||
uuids.append(user.uuid)
|
||||
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
|
||||
url = reverse('a2-api-users-synchronization')
|
||||
content = {
|
||||
'known_uuids': uuids + unknown_uuids,
|
||||
}
|
||||
random.shuffle(content['known_uuids'])
|
||||
|
||||
# test permission check
|
||||
response = app.post_json(url, params=content, headers=headers, status=403)
|
||||
r = Role.objects.get_admin_role(
|
||||
ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
|
||||
)
|
||||
r.members.add(simple_user)
|
||||
response = app.post_json(url, params=content, headers=headers)
|
||||
assert response.json['result'] == 1
|
||||
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
|
||||
|
||||
|
||||
def test_full_known_users(app, admin):
|
||||
headers = basic_authorization_header(admin)
|
||||
uuids = []
|
||||
for _ in range(100):
|
||||
user = User.objects.create(first_name='jim', last_name='jam')
|
||||
uuids.append(user.uuid)
|
||||
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
|
||||
url = reverse('a2-api-users-synchronization')
|
||||
content = {
|
||||
'known_uuids': uuids + unknown_uuids,
|
||||
'full_known_users': 1,
|
||||
}
|
||||
random.shuffle(content['known_uuids'])
|
||||
response = app.post_json(url, params=content, headers=headers)
|
||||
assert response.json['result'] == 1
|
||||
|
||||
# known users returned as part of api's full mode:
|
||||
assert len(response.json['known_users']) == 100
|
||||
for user_dict in response.json['known_users']:
|
||||
assert user_dict['first_name'] == 'jim'
|
||||
assert user_dict['last_name'] == 'jam'
|
||||
assert {
|
||||
'uuid',
|
||||
'email',
|
||||
'is_staff',
|
||||
'is_superuser',
|
||||
'email_verified',
|
||||
'ou',
|
||||
'is_active',
|
||||
'deactivation',
|
||||
'modified',
|
||||
}.issubset(set(user_dict.keys()))
|
||||
|
||||
|
||||
def test_timestamp(app, admin):
|
||||
headers = basic_authorization_header(admin)
|
||||
url = reverse('a2-api-users-synchronization')
|
||||
now = datetime.datetime.now()
|
||||
|
||||
ou = get_default_ou()
|
||||
users = []
|
||||
|
||||
for i in range(6):
|
||||
users.append(
|
||||
User.objects.create(
|
||||
first_name='john%s' % i,
|
||||
last_name='doe',
|
||||
username='user%s' % i,
|
||||
email='user%s' % i,
|
||||
ou=ou,
|
||||
)
|
||||
)
|
||||
|
||||
for i, event_name in enumerate(
|
||||
[
|
||||
'manager.user.creation',
|
||||
'manager.user.profile.edit',
|
||||
'manager.user.activation',
|
||||
'manager.user.deactivation',
|
||||
'manager.user.password.change.force',
|
||||
'manager.user.password.change.unforce',
|
||||
]
|
||||
):
|
||||
event_type = EventType.objects.get_for_name(event_name)
|
||||
Event.objects.create(
|
||||
type=event_type,
|
||||
timestamp=now - datetime.timedelta(days=i, hours=1),
|
||||
references=[users[i]],
|
||||
)
|
||||
|
||||
content = {
|
||||
'known_uuids': [user.uuid for user in users],
|
||||
'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
|
||||
}
|
||||
|
||||
response = app.post(url, params=content, headers=headers)
|
||||
|
||||
for user in users[:3]:
|
||||
assert user.uuid in response.json['modified_users_uuids']
|
||||
assert user.uuid not in response.json['unknown_uuids']
|
||||
for user in users[3:]:
|
||||
assert user.uuid not in response.json['modified_users_uuids']
|
||||
assert user.uuid not in response.json['unknown_uuids']
|
||||
|
||||
for user in users[:3]:
|
||||
user.delete()
|
||||
|
||||
content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
|
||||
|
||||
response = app.post(url, params=content, headers=headers)
|
||||
|
||||
for user in users[:3]:
|
||||
assert user.uuid not in response.json['modified_users_uuids']
|
||||
assert user.uuid in response.json['unknown_uuids']
|
||||
for user in users[3:]:
|
||||
assert user.uuid in response.json['modified_users_uuids']
|
||||
assert user.uuid not in response.json['unknown_uuids']
|
||||
|
||||
for user in users[3:]:
|
||||
user.delete()
|
||||
|
||||
response = app.post(url, params=content, headers=headers)
|
||||
|
||||
assert not response.json['modified_users_uuids']
|
||||
for user in users:
|
||||
assert user.uuid in response.json['unknown_uuids']
|
||||
|
||||
for user in users[:3]:
|
||||
content['known_uuids'].remove(user.uuid)
|
||||
|
||||
response = app.post(url, params=content, headers=headers)
|
||||
|
||||
assert not response.json['modified_users_uuids']
|
||||
assert len(response.json['unknown_uuids']) == 3
|
||||
for user in users[3:]:
|
||||
assert user.uuid in response.json['unknown_uuids']
|
Loading…
Reference in New Issue