tests: test profile distinction in synchronization endpoint (#63157)

This commit is contained in:
Paul Marillonnet 2022-03-28 15:17:30 +02:00
parent 241ceaaccd
commit 4939516808
1 changed files with 133 additions and 1 deletions

View File

@ -17,18 +17,22 @@
import datetime
import json
import random
import uuid
from unittest import mock
import django
import pytest
from django.contrib.auth import get_user_model
from django.contrib.auth.hashers import check_password
from django.contrib.contenttypes.models import ContentType
from django.core import mail
from django.urls import reverse
from django.utils.encoding import force_str
from django.utils.text import slugify
from requests.models import Response
from authentic2.a2_rbac.models import SEARCH_OP
from authentic2.a2_rbac.models import OrganizationalUnit as OU
from authentic2.a2_rbac.models import Role
from authentic2.a2_rbac.utils import get_default_ou
@ -38,7 +42,13 @@ from authentic2.models import APIClient, Attribute, AttributeValue, AuthorizedRo
from authentic2.utils.misc import good_next_url
from authentic2_idp_cas.models import Service as CASService
from ..utils import assert_event, basic_authorization_header, get_link_from_mail, login
from ..utils import (
assert_event,
basic_authorization_header,
basic_authorization_oidc_client,
get_link_from_mail,
login,
)
pytestmark = pytest.mark.django_db
@ -1137,6 +1147,128 @@ def test_register_ou_no_email_validation(settings, app, admin, django_user_model
assert user.check_password(password)
def test_user_synchronization(app, simple_user):
headers = basic_authorization_header(simple_user)
uuids = []
for _ in range(100):
user = User.objects.create(first_name='ben', last_name='dauve')
uuids.append(user.uuid)
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
url = reverse('a2-api-users-synchronization')
content = {
'known_uuids': uuids + unknown_uuids,
}
random.shuffle(content['known_uuids'])
response = app.post_json(url, params=content, headers=headers, status=403)
# give custom_user.search_user permission to user
r = Role.objects.get_admin_role(
ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
)
r.members.add(simple_user)
response = app.post_json(url, params=content, headers=headers)
assert response.json['result'] == 1
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
def test_user_synchronization_modification_profile(app):
from authentic2_idp_oidc.models import OIDCClient
from authentic2_idp_oidc.utils import make_sub
uuids = []
users = []
precreate_dt = datetime.datetime.now()
profile_type = ProfileType.objects.create(name='Referee', slug='referee')
oidc_client = OIDCClient.objects.create(
name='Synchronized client',
slug='synchronized-client',
sector_identifier_uri='https://sync-client.example.org/',
identifier_policy=OIDCClient.POLICY_PAIRWISE_REVERSIBLE,
has_api_access=True,
authorization_mode=OIDCClient.AUTHORIZATION_MODE_BY_SERVICE,
)
headers = basic_authorization_oidc_client(oidc_client)
for i in range(100):
user = User.objects.create(first_name='john', last_name='doe', email='john.doe.%s@ad.dre.ss' % i)
uuids.append(user.uuid)
profile = None
if i % 2:
profile = Profile.objects.create(
profile_type=profile_type,
user=user,
email=f'referee-{i}@ad.dre.ss',
identifier=f'referee-{i}',
data={'foo': i},
)
users.append(
(
user,
profile,
)
)
url = reverse('a2-api-users-synchronization')
# first attempt with no profile information
uuids = [make_sub(oidc_client, user) for user, _ in users]
content = {
'known_uuids': uuids,
}
response = app.post_json(url, params=content, headers=headers)
assert response
assert not response.json['unknown_uuids']
# this time subs with profile info
uuids = [make_sub(oidc_client, user, profile) for user, profile in users]
content = {
'known_uuids': uuids,
}
response = app.post_json(url, params=content, headers=headers)
assert response
assert not response.json['unknown_uuids']
response = app.get(
'/api/users/?modified__gt=%s' % precreate_dt.strftime('%Y-%m-%dT%H:%M:%S'), headers=headers
)
assert len(response.json['results']) == 100
def test_user_synchronization_full(app, admin):
headers = basic_authorization_header(admin)
uuids = []
for _ in range(100):
user = User.objects.create(first_name='jim', last_name='jam')
uuids.append(user.uuid)
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
url = reverse('a2-api-users-synchronization')
content = {
'known_uuids': uuids + unknown_uuids,
'full_known_users': 1,
}
random.shuffle(content['known_uuids'])
response = app.post_json(url, params=content, headers=headers)
assert response.json['result'] == 1
# known users returned as part of api's full mode:
assert len(response.json['known_users']) == 100
for user_dict in response.json['known_users']:
assert user_dict['first_name'] == 'jim'
assert user_dict['last_name'] == 'jam'
assert {
'uuid',
'email',
'is_staff',
'is_superuser',
'email_verified',
'ou',
'is_active',
'deactivation',
'modified',
}.issubset(set(user_dict.keys()))
def test_api_drf_authentication_class(app, admin, user_ou1, oidc_client):
url = '/api/users/%s/' % user_ou1.uuid
# test invalid client