172 lines
5.7 KiB
Python
172 lines
5.7 KiB
Python
# authentic2 - versatile identity manager
|
|
# Copyright (C) 2010-2022 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import random
|
|
import uuid
|
|
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.urls import reverse
|
|
|
|
from authentic2.a2_rbac.models import SEARCH_OP, Role
|
|
from authentic2.a2_rbac.utils import get_default_ou
|
|
from authentic2.apps.journal.models import Event, EventType
|
|
from authentic2.custom_user.models import User
|
|
|
|
from ..utils import basic_authorization_header
|
|
|
|
|
|
def test_basic(app, simple_user):
|
|
headers = basic_authorization_header(simple_user)
|
|
uuids = []
|
|
for _ in range(100):
|
|
user = User.objects.create(first_name='ben', last_name='dauve')
|
|
uuids.append(user.uuid)
|
|
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
|
|
url = reverse('a2-api-users-synchronization')
|
|
content = {
|
|
'known_uuids': uuids + unknown_uuids,
|
|
}
|
|
random.shuffle(content['known_uuids'])
|
|
|
|
# test permission check
|
|
response = app.post_json(url, params=content, headers=headers, status=403)
|
|
r = Role.objects.get_admin_role(
|
|
ContentType.objects.get_for_model(User), name='role', slug='role', operation=SEARCH_OP
|
|
)
|
|
r.members.add(simple_user)
|
|
response = app.post_json(url, params=content, headers=headers)
|
|
assert response.json['result'] == 1
|
|
assert set(response.json['unknown_uuids']) == set(unknown_uuids)
|
|
|
|
|
|
def test_full_known_users(app, admin):
|
|
headers = basic_authorization_header(admin)
|
|
uuids = []
|
|
for _ in range(100):
|
|
user = User.objects.create(first_name='jim', last_name='jam')
|
|
uuids.append(user.uuid)
|
|
unknown_uuids = [uuid.uuid4().hex for i in range(100)]
|
|
url = reverse('a2-api-users-synchronization')
|
|
content = {
|
|
'known_uuids': uuids + unknown_uuids,
|
|
'full_known_users': 1,
|
|
}
|
|
random.shuffle(content['known_uuids'])
|
|
response = app.post_json(url, params=content, headers=headers)
|
|
assert response.json['result'] == 1
|
|
|
|
# known users returned as part of api's full mode:
|
|
assert len(response.json['known_users']) == 100
|
|
for user_dict in response.json['known_users']:
|
|
assert user_dict['first_name'] == 'jim'
|
|
assert user_dict['last_name'] == 'jam'
|
|
assert {
|
|
'uuid',
|
|
'email',
|
|
'is_staff',
|
|
'is_superuser',
|
|
'email_verified',
|
|
'ou',
|
|
'is_active',
|
|
'deactivation',
|
|
'modified',
|
|
}.issubset(set(user_dict.keys()))
|
|
|
|
|
|
def test_timestamp(app, admin):
|
|
headers = basic_authorization_header(admin)
|
|
url = reverse('a2-api-users-synchronization')
|
|
now = datetime.datetime.now()
|
|
|
|
ou = get_default_ou()
|
|
users = []
|
|
|
|
for i in range(6):
|
|
users.append(
|
|
User.objects.create(
|
|
first_name='john%s' % i,
|
|
last_name='doe',
|
|
username='user%s' % i,
|
|
email='user%s' % i,
|
|
ou=ou,
|
|
)
|
|
)
|
|
|
|
for i, event_name in enumerate(
|
|
[
|
|
'manager.user.creation',
|
|
'manager.user.profile.edit',
|
|
'manager.user.activation',
|
|
'manager.user.deactivation',
|
|
'manager.user.password.change.force',
|
|
'manager.user.password.change.unforce',
|
|
]
|
|
):
|
|
event_type = EventType.objects.get_for_name(event_name)
|
|
Event.objects.create(
|
|
type=event_type,
|
|
timestamp=now - datetime.timedelta(days=i, hours=1),
|
|
references=[users[i]],
|
|
)
|
|
|
|
content = {
|
|
'known_uuids': [user.uuid for user in users],
|
|
'timestamp': (now - datetime.timedelta(days=3)).isoformat(),
|
|
}
|
|
|
|
response = app.post(url, params=content, headers=headers)
|
|
|
|
for user in users[:3]:
|
|
assert user.uuid in response.json['modified_users_uuids']
|
|
assert user.uuid not in response.json['unknown_uuids']
|
|
for user in users[3:]:
|
|
assert user.uuid not in response.json['modified_users_uuids']
|
|
assert user.uuid not in response.json['unknown_uuids']
|
|
|
|
for user in users[:3]:
|
|
user.delete()
|
|
|
|
content['timestamp'] = (now - datetime.timedelta(days=7)).isoformat()
|
|
|
|
response = app.post(url, params=content, headers=headers)
|
|
|
|
for user in users[:3]:
|
|
assert user.uuid not in response.json['modified_users_uuids']
|
|
assert user.uuid in response.json['unknown_uuids']
|
|
for user in users[3:]:
|
|
assert user.uuid in response.json['modified_users_uuids']
|
|
assert user.uuid not in response.json['unknown_uuids']
|
|
|
|
for user in users[3:]:
|
|
user.delete()
|
|
|
|
response = app.post(url, params=content, headers=headers)
|
|
|
|
assert not response.json['modified_users_uuids']
|
|
for user in users:
|
|
assert user.uuid in response.json['unknown_uuids']
|
|
|
|
for user in users[:3]:
|
|
content['known_uuids'].remove(user.uuid)
|
|
|
|
response = app.post(url, params=content, headers=headers)
|
|
|
|
assert not response.json['modified_users_uuids']
|
|
assert len(response.json['unknown_uuids']) == 3
|
|
for user in users[3:]:
|
|
assert user.uuid in response.json['unknown_uuids']
|