198 lines
6.7 KiB
Python
198 lines
6.7 KiB
Python
import mock
|
|
import pytest
|
|
|
|
from django.conf import settings
|
|
from django.core.management import call_command
|
|
|
|
from mandayejs.mandaye.models import UserCredentials
|
|
|
|
from utils import create_user, create_credentials, get_uuid, get_user
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
# ENCRYPTION/DECRYPTION
|
|
def test_encryption(credentials):
|
|
decrypted = credentials.decrypt()
|
|
assert decrypted.get('password') == 'john password'
|
|
|
|
|
|
@pytest.fixture(params=['command_ldap', 'command_csv'])
|
|
def command(request, command_ldap, command_csv):
|
|
return locals().get(request.param)
|
|
|
|
|
|
@mock.patch('mandayejs.mandaye.management.commands.migrate-users.get_idps')
|
|
def test_command_migrate_users(mocked_idps, command):
|
|
mocked_idps.return_value = iter(settings.MELLON_IDENTITY_PROVIDERS)
|
|
call_command(command.name, *command.args, **command.opts)
|
|
if command.opts.get('ldap'):
|
|
credentials = UserCredentials.objects.filter(user__last_name__in=[
|
|
'ldap_user1',
|
|
'ldap_user2',
|
|
'ldap_user3'])
|
|
|
|
assert len(credentials) == 3
|
|
|
|
for cred in credentials:
|
|
assert cred.to_login_info(decrypt=True)['#password'] == 'password_{}'.format(cred.user.last_name)
|
|
else:
|
|
credentials = UserCredentials.objects.all().exclude(user__last_name__in=[
|
|
'ldap_user1',
|
|
'ldap_user2',
|
|
'ldap_user3'
|
|
])
|
|
|
|
assert len(credentials) == 4
|
|
|
|
for cred in credentials:
|
|
assert cred.to_login_info(decrypt=True)['#password'] == cred.to_login_info()['#username']
|
|
|
|
|
|
# MANDAYE API
|
|
|
|
# GET
|
|
def test_api_get(client, url):
|
|
response = client.get(url)
|
|
if client.session.values():
|
|
status_code = 200
|
|
else:
|
|
status_code = 403
|
|
|
|
assert response.status_code == status_code
|
|
|
|
if status_code == 200:
|
|
assert {'login': '', 'password': ''} == response.data
|
|
|
|
|
|
@pytest.mark.skipif(settings.HOBO is None, reason="hobo is required")
|
|
def test_signed_api_get(client_service, url_signed):
|
|
response = client_service.get(url_signed.url)
|
|
if url_signed.orig == 'testserver':
|
|
status_code = 200
|
|
else:
|
|
status_code = 403
|
|
|
|
assert response.status_code == status_code
|
|
|
|
if status_code == 200:
|
|
assert {'login': '', 'password': ''} == response.data
|
|
|
|
|
|
# POST
|
|
@mock.patch('mandayejs.mandaye.api.exec_phantom')
|
|
def test_api_post(mock_phantomjs_result, client, url, payload):
|
|
|
|
if client.session.values():
|
|
status_code = {'success': 200, 'failure': 401}
|
|
else:
|
|
status_code = {'success': 403, 'failure': 403}
|
|
|
|
if payload.get('name_id_content') == '12345':
|
|
response = client.post(url, data=payload, format='json')
|
|
|
|
assert response.status_code == status_code['failure']
|
|
|
|
if client.session.values():
|
|
kevin = get_user(first_name='kevin')
|
|
assert kevin.username == payload['name_id_content']
|
|
|
|
kevin_uuid = get_uuid(name_id=payload['name_id_content'])
|
|
assert kevin_uuid.name_id == '12345'
|
|
else:
|
|
mock_phantomjs_result.return_value = {"result": "ok"}
|
|
|
|
response = client.post(url, data=payload, format='json')
|
|
|
|
assert response.status_code == status_code['success']
|
|
|
|
if client.session.values():
|
|
josh = get_user(username='77777')
|
|
josh_creds = UserCredentials.objects.filter(user=josh)[0]
|
|
|
|
assert josh_creds.to_login_info()['#login'] == 'josh'
|
|
assert josh_creds.to_login_info(decrypt=True)['#password'] == 'josh password'
|
|
|
|
|
|
@pytest.mark.skipif(settings.HOBO is None, reason="hobo is required")
|
|
@mock.patch('mandayejs.mandaye.api.exec_phantom')
|
|
def test_signed_api_post(mock_phantomjs_result, client_service, url_signed, payload):
|
|
if url_signed.orig == 'testserver':
|
|
status_code = {'success': 200, 'failure': 401}
|
|
else:
|
|
status_code = {'success': 403, 'failure': 403}
|
|
|
|
if payload.get('name_id_content') == '12345':
|
|
response = client_service.post(url_signed.url, data=payload, format='json')
|
|
|
|
assert response.status_code == status_code['failure']
|
|
|
|
if url_signed.orig == 'testserver':
|
|
kevin = get_user(first_name='kevin')
|
|
assert kevin.username == payload['name_id_content']
|
|
|
|
kevin_uuid = get_uuid(name_id=payload['name_id_content'])
|
|
assert kevin_uuid.name_id == '12345'
|
|
else:
|
|
mock_phantomjs_result.return_value = {"result": "ok"}
|
|
|
|
response = client_service.post(url_signed.url, data=payload, format='json')
|
|
|
|
assert response.status_code == status_code['success']
|
|
|
|
if url_signed.orig == 'testserver':
|
|
josh = get_user(username='77777')
|
|
josh_creds = UserCredentials.objects.filter(user=josh)[0]
|
|
|
|
assert josh_creds.to_login_info()['#login'] == 'josh'
|
|
assert josh_creds.to_login_info(decrypt=True)['#password'] == 'josh password'
|
|
|
|
|
|
# DELETE
|
|
def test_api_delete(client, url):
|
|
if client.session.values():
|
|
status_code = {'success': 200, 'failure': 404}
|
|
else:
|
|
status_code = {'success': 403, 'failure': 403}
|
|
|
|
kevin = get_user(first_name='kevin')
|
|
assert UserCredentials.objects.filter(user=kevin).exists() is False
|
|
response = client.delete(url, data={'name_id_content': '12345'}, format='json')
|
|
assert response.status_code == status_code['failure']
|
|
|
|
josh = create_user(username='77777')
|
|
create_credentials(josh, {
|
|
'login': 'josh',
|
|
'password': 'josh password'})
|
|
|
|
assert UserCredentials.objects.filter(user=josh).exists() is True
|
|
response = client.delete(url, data={'name_id_content': '77777'}, format='json')
|
|
assert response.status_code == status_code['success']
|
|
if client.session.values():
|
|
assert UserCredentials.objects.filter(user=josh).exists() is False
|
|
|
|
|
|
@pytest.mark.skipif(settings.HOBO is None, reason="hobo is required")
|
|
def test_signed_api_delete(client_service, url_signed):
|
|
if url_signed.orig == 'testserver':
|
|
status_code = {'success': 200, 'failure': 404}
|
|
else:
|
|
status_code = {'success': 403, 'failure': 403}
|
|
|
|
kevin = get_user(first_name='kevin')
|
|
assert UserCredentials.objects.filter(user=kevin).exists() is False
|
|
response = client_service.delete(url_signed.url, data={'name_id_content': '12345'}, format='json')
|
|
assert response.status_code == status_code['failure']
|
|
|
|
josh = create_user(username='77777')
|
|
create_credentials(josh, {
|
|
'login': 'josh',
|
|
'password': 'josh password'
|
|
})
|
|
|
|
assert UserCredentials.objects.filter(user=josh).exists() is True
|
|
response = client_service.delete(url_signed.url, data={'name_id_content': '77777'}, format='json')
|
|
assert response.status_code == status_code['success']
|
|
if url_signed.orig == 'testserver':
|
|
assert UserCredentials.objects.filter(user=josh).exists() is False
|