182 lines
5.2 KiB
Python
182 lines
5.2 KiB
Python
# authentic2 - versatile identity manager
|
|
# Copyright (C) 2010-2022 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import random
|
|
import uuid
|
|
|
|
import pytest
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.urls import reverse
|
|
|
|
from authentic2.a2_rbac.models import SEARCH_OP, Role
|
|
from authentic2.apps.journal.models import Event, EventType
|
|
from authentic2.custom_user.models import User
|
|
|
|
URL = '/api/users/synchronization/'
|
|
|
|
|
|
@pytest.fixture
|
|
def user(simple_user):
|
|
role = Role.objects.get_admin_role(
|
|
ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
|
|
)
|
|
role.members.add(simple_user)
|
|
return simple_user
|
|
|
|
|
|
@pytest.fixture
|
|
def app(app, user):
|
|
app.set_authorization(('Basic', (user.username, user.username)))
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def users(db):
|
|
return [User.objects.create(first_name='john', last_name='doe') for _ in range(10)]
|
|
|
|
|
|
@pytest.fixture
|
|
def uuids(users):
|
|
return [user.uuid for user in users]
|
|
|
|
|
|
def test_url(app, simple_user):
|
|
# URL is publikc api, check it
|
|
assert URL == reverse('a2-api-users-synchronization')
|
|
|
|
|
|
def test_authentication_required(app):
|
|
app.set_authorization(None)
|
|
app.post_json(URL, status=401)
|
|
|
|
|
|
def test_permission_required(app, user):
|
|
user.roles.clear()
|
|
app.post_json(URL, status=403)
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def unknown_uuids():
|
|
return [uuid.uuid4().hex for i in range(10)]
|
|
|
|
|
|
@pytest.fixture
|
|
def payload(uuids, unknown_uuids):
|
|
content = {
|
|
'known_uuids': uuids + unknown_uuids,
|
|
}
|
|
random.shuffle(content['known_uuids'])
|
|
return content
|
|
|
|
|
|
def test_basic(app, payload, unknown_uuids):
|
|
response = app.post_json(URL, params=payload)
|
|
assert response.json['result'] == 1
|
|
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
|
|
|
|
|
|
def test_full_known_users(app, payload):
|
|
payload['full_known_users'] = 1
|
|
response = app.post_json(URL, params=payload)
|
|
assert response.json['result'] == 1
|
|
|
|
# known users returned as part of api's full mode:
|
|
assert len(response.json['known_users']) == 10
|
|
for user_dict in response.json['known_users']:
|
|
assert user_dict['first_name'] == 'john'
|
|
assert user_dict['last_name'] == 'doe'
|
|
assert {
|
|
'uuid',
|
|
'email',
|
|
'is_staff',
|
|
'is_superuser',
|
|
'email_verified',
|
|
'ou',
|
|
'is_active',
|
|
'deactivation',
|
|
'modified',
|
|
}.issubset(set(user_dict.keys()))
|
|
|
|
|
|
def test_timestamp(app, users):
|
|
now = datetime.datetime.now()
|
|
users = users[:6]
|
|
|
|
for i, event_name in enumerate(
|
|
[
|
|
'manager.user.creation',
|
|
'manager.user.profile.edit',
|
|
'manager.user.activation',
|
|
'manager.user.deactivation',
|
|
'manager.user.password.change.force',
|
|
'manager.user.password.change.unforce',
|
|
]
|
|
):
|
|
event_type = EventType.objects.get_for_name(event_name)
|
|
Event.objects.create(
|
|
type=event_type,
|
|
timestamp=now - datetime.timedelta(days=i, hours=1),
|
|
references=[users[i]],
|
|
)
|
|
|
|
content = {
|
|
'known_uuids': [user.uuid for user in users],
|
|
'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
|
|
}
|
|
|
|
response = app.post(URL, params=content)
|
|
|
|
for user in users[:3]:
|
|
assert user.uuid in response.json['modified_users_uuids']
|
|
assert user.uuid not in response.json['unknown_uuids']
|
|
for user in users[3:]:
|
|
assert user.uuid not in response.json['modified_users_uuids']
|
|
assert user.uuid not in response.json['unknown_uuids']
|
|
|
|
for user in users[:3]:
|
|
user.delete()
|
|
|
|
content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
|
|
|
|
response = app.post(URL, params=content)
|
|
|
|
for user in users[:3]:
|
|
assert user.uuid not in response.json['modified_users_uuids']
|
|
assert user.uuid in response.json['unknown_uuids']
|
|
for user in users[3:]:
|
|
assert user.uuid in response.json['modified_users_uuids']
|
|
assert user.uuid not in response.json['unknown_uuids']
|
|
|
|
for user in users[3:]:
|
|
user.delete()
|
|
|
|
response = app.post(URL, params=content)
|
|
|
|
assert not response.json['modified_users_uuids']
|
|
for user in users:
|
|
assert user.uuid in response.json['unknown_uuids']
|
|
|
|
for user in users[:3]:
|
|
content['known_uuids'].remove(user.uuid)
|
|
|
|
response = app.post(URL, params=content)
|
|
|
|
assert not response.json['modified_users_uuids']
|
|
assert len(response.json['unknown_uuids']) == 3
|
|
for user in users[3:]:
|
|
assert user.uuid in response.json['unknown_uuids']
|