This repository has been archived on 2023-08-12. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-gnm/tests/test_commands.py

77 lines
2.9 KiB
Python

import random
import uuid
import httmock
import pytest
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.utils import crypto
from authentic2_auth_oidc.models import OIDCAccount
from django.contrib.auth import get_user_model
from django.core.management import call_command
@pytest.mark.parametrize('deletion_number,deletion_valid', [(2, True), (5, True), (10, False)])
def test_user_synchronization_deletion_threshold(
db, app, admin, settings, capsys, oidc_provider, deletion_number, deletion_valid
):
User = get_user_model()
for i in range(100):
user = User.objects.create(
first_name='John%s' % i,
last_name='Doe%s' % i,
username='john.doe.%s' % i,
email='john.doe.%s@ad.dre.ss',
ou=get_default_ou(),
)
identifier = uuid.UUID(user.uuid).bytes
sector_identifier = 'cut'
cipher_args = [
settings.SECRET_KEY.encode('utf-8'),
identifier,
sector_identifier,
]
sub = crypto.aes_base64url_deterministic_encrypt(*cipher_args).decode('utf-8')
OIDCAccount.objects.create(user=user, provider=oidc_provider, sub=sub)
def synchronization_post_deletion_response(url, request):
headers = {'content-type': 'application/json'}
content = {
'unknown_uuids': [
account.sub for account in random.sample(list(OIDCAccount.objects.all()), deletion_number)
]
}
return httmock.response(status_code=200, headers=headers, content=content, request=request)
def synchronization_get_modified_response(url, request):
headers = {'content-type': 'application/json'}
content = {'results': [user.to_json() for user in random.sample(list(User.objects.all()), 20)]}
return httmock.response(status_code=200, headers=headers, content=content, request=request)
with httmock.HTTMock(
httmock.urlmatch(
netloc=r'cut\.base\.provider',
path=r'^/users/synchronization/$',
method='POST',
)(synchronization_post_deletion_response)
):
with httmock.HTTMock(
httmock.urlmatch(
netloc=r'cut\.base\.provider',
path=r'^/users/*',
method='GET',
)(synchronization_get_modified_response)
):
call_command('sync-cut', '--delta', '300', '-v1')
out, err = capsys.readouterr()
assert not err
if deletion_valid:
# existing users check
assert OIDCAccount.objects.count() == 100 - deletion_number
else:
assert 'deletion ratio is abnormally high' in out
assert OIDCAccount.objects.count() == 100
# users update
assert 'got 20 users' in out # fixme: further testing in dedicated unit test