This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
mandayejs/tests/tests.py

410 lines
12 KiB
Python

import mock
import pytest
from django.conf import settings
from django.core.urlresolvers import reverse
from django.contrib.auth.models import User
from django.core.management import call_command
from rest_framework.test import APIClient
from rest_framework.settings import api_settings
from mandayejs.applications import get_app_settings
from mandayejs.mandaye.models import UserCredentials
from mandayejs.mandaye import utils
from mellon.models import UserSAMLIdentifier
try:
import hobo
from hobo import signature
except(ImportError,):
hobo = None
pytestmark = pytest.mark.django_db
settings.MELLON_IDENTITY_PROVIDERS = [
{
'METADATA_URL': 'http://testsever.org/idp/saml2/metadata'
}
]
settings.SITE_APP = 'mandayejs.applications.Test'
settings.SECRET_KEY = 'od5cei4aeveel8dui4lei2ou9ahsei2A'
settings.KNOWN_SERVICES = {
"service":{
"testserver": {
"title": "testserver",
"orig": "testserver",
"url": "http://testserver",
"secret": "od5cei4aeveel8dui4lei2ou9ahsei2A",
"verif_orig": "testserver"
}
}
}
settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS = 'hobo.rest_authentication.AnonymousAdminServiceUser'
# REST FRAMEWORK SETTINGS
if hobo:
AUTHENTICATION_CLASSES = (
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication',
'hobo.rest_authentication.PublikAuthentication',
)
else:
AUTHENTICATION_CLASSES = (
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication',
)
api_settings.user_settings.update({
'DEFAULT_AUTHENTICATION_CLASSES' : AUTHENTICATION_CLASSES,
})
# ENCRYPTION/DECRYPTION
def create_user(**kwargs):
password = kwargs.pop('password', None) or kwargs.get('username')
user, created = User.objects.get_or_create(**kwargs)
if password:
user.set_password(password)
user.save()
return user
def get_user(**kwargs):
try:
user = User.objects.get(**kwargs)
return user
except (User.DoesNotExist,):
return None
def get_uuid(**kwargs):
try:
uuid = UserSAMLIdentifier.objects.get(**kwargs)
return uuid
except (UserSAMLIdentifier.DoesNotExist):
return None
def create_credentials(user, credentials):
cred, created = UserCredentials.objects.get_or_create(user=user, locators=credentials)
return cred
@pytest.fixture
def admin(db):
return create_user(
username='admin',
first_name='admin',
last_name='admin',
email='admin@example.net',
is_superuser=True,
is_staff=True,
is_active=True
)
@pytest.fixture
def user_john(db):
return create_user(username='john')
@pytest.fixture
def cred_john(db,user_john):
return create_credentials(user_john, {
"login": "john",
"password": "john password"
})
@pytest.fixture(params=['cred_john'])
def credentials(request, cred_john):
return locals().get(request.param)
def test_encryption(credentials):
decrypted = credentials.decrypt()
assert decrypted.get('password') == 'john password'
# MIGRATION COMMAND
def cmd(*args, **kwargs):
cmd = type(
'Cmd',
(object,),
{
'name': 'migrate-users',
'args': args,
'opts': kwargs
}
)
return cmd
@pytest.fixture
def command_ldap():
return cmd(
'tests/ldap_users.txt',
ldap= True
)
@pytest.fixture
def command_csv():
return cmd(
'tests/csv_users.csv',
csv= True
)
@pytest.fixture(params=['command_ldap', 'command_csv'])
def command(request, command_ldap, command_csv):
return locals().get(request.param)
def test_command_migrate_users(command):
call_command(command.name, *command.args, **command.opts)
if command.opts.get('ldap'):
credentials = UserCredentials.objects.filter(user__last_name__in=[
'ldap_user1',
'ldap_user2',
'ldap_user3'
])
assert len(credentials) == 3
for cred in credentials:
assert cred.to_login_info(decrypt=True)['#password'] == 'password_{}'.format(cred.user.last_name)
else:
credentials = UserCredentials.objects.all().exclude(user__last_name__in=[
'ldap_user1',
'ldap_user2',
'ldap_user3'
])
assert len(credentials) == 4
for cred in credentials:
assert cred.to_login_info(decrypt=True)['#password'] == cred.to_login_info()['#username']
# MANDAYE API
@pytest.fixture
def url():
return reverse('api')
def create_signed_url(secret, orig):
url = signature.sign_url(reverse('api'), secret)
url += '&orig={}'.format(orig)
return type(
'signed_url',
(object,),
{'url': url, 'orig': orig}
)
@pytest.fixture
def url_signed_testserver_service():
return create_signed_url(
settings.SECRET_KEY,
'testserver'
)
@pytest.fixture
def url_signed_unknown_service():
return create_signed_url(
settings.SECRET_KEY,
'lol'
)
@pytest.fixture(params=['url_signed_testserver_service', 'url_signed_unknown_service'])
def url_signed(request, url_signed_testserver_service, url_signed_unknown_service):
return locals().get(request.param)
@pytest.fixture
def client_service():
return APIClient()
@pytest.fixture
def client_anonymous():
return APIClient()
@pytest.fixture
def client_logged(admin):
client = APIClient()
client.login(
username='admin',password='admin'
)
return client
@pytest.fixture(params=['client_anonymous', 'client_logged'])
def client(request, client_anonymous, client_logged):
return locals().get(request.param)
@pytest.fixture
def kevin_payload():
return {
'name_id_content': '12345',
'email': 'kevin@fake.com',
'first_name': 'kevin',
'last_name': 'fake',
'locators': {
'login': 'fake',
'password': 'fake'
}
}
@pytest.fixture
def josh_payload():
return {
'name_id_content': '77777',
'email': 'josh@loking.com',
'first_name': 'josh',
'last_name': 'loking',
'locators': {
'login': 'josh',
'password': 'josh password'
}
}
@pytest.fixture(params=['kevin_payload', 'josh_payload'])
def payload(request, kevin_payload, josh_payload):
return locals().get(request.param)
# GET
def test_api_get(client, url):
response = client.get(url)
if client.session:
status_code = 200
else:
status_code = 403
assert response.status_code == status_code
if status_code == 200:
assert {'login': '', 'password': ''} == response.data
@pytest.mark.skipif(hobo == None, reason="hobo is required")
def test_signed_api_get(client_service, url_signed):
response = client_service.get(url_signed.url)
if url_signed.orig == 'testserver':
status_code = 200
else:
status_code = 403
assert response.status_code == status_code
if status_code == 200:
assert {'login': '', 'password': ''} == response.data
# POST
@mock.patch('mandayejs.mandaye.api.exec_phantom')
def test_api_post(mock_phantomjs_result, client, url, payload):
if client.session :
status_code = {'success': 200, 'failure': 401}
else:
status_code = {'success': 403, 'failure': 403}
if payload.get('name_id_content') == '12345':
response = client.post(url, data=payload, format='json')
assert response.status_code == status_code['failure']
if client.session:
kevin = get_user(first_name='kevin')
assert kevin.username == payload['name_id_content']
kevin_uuid = get_uuid(name_id=payload['name_id_content'])
assert kevin_uuid.name_id == '12345'
else:
mock_phantomjs_result.return_value = {"result": "ok"}
response = client.post(url, data=payload, format='json')
assert response.status_code == status_code['success']
if client.session:
josh = get_user(username='77777')
josh_creds = UserCredentials.objects.filter(user=josh)[0]
assert josh_creds.to_login_info()['#login'] == 'josh'
assert josh_creds.to_login_info(decrypt=True)['#password'] == 'josh password'
@pytest.mark.skipif(hobo == None, reason="hobo is required")
@mock.patch('mandayejs.mandaye.api.exec_phantom')
def test_signed_api_post(mock_phantomjs_result, client_service, url_signed, payload):
if url_signed.orig == 'testserver' :
status_code = {'success': 200, 'failure': 401}
else:
status_code = {'success': 403, 'failure': 403}
if payload.get('name_id_content') == '12345':
response = client_service.post(url_signed.url, data=payload, format='json')
assert response.status_code == status_code['failure']
if url_signed.orig == 'testserver' :
kevin = get_user(first_name='kevin')
assert kevin.username == payload['name_id_content']
kevin_uuid = get_uuid(name_id=payload['name_id_content'])
assert kevin_uuid.name_id == '12345'
else:
mock_phantomjs_result.return_value = {"result": "ok"}
response = client_service.post(url_signed.url, data=payload, format='json')
assert response.status_code == status_code['success']
if url_signed.orig == 'testserver':
josh = get_user(username='77777')
josh_creds = UserCredentials.objects.filter(user=josh)[0]
assert josh_creds.to_login_info()['#login'] == 'josh'
assert josh_creds.to_login_info(decrypt=True)['#password'] == 'josh password'
# DELETE
def test_api_delete(client, url):
if client.session :
status_code = {'success': 200, 'failure': 404}
else:
status_code = {'success': 403, 'failure': 403}
kevin = get_user(first_name='kevin')
assert UserCredentials.objects.filter(user=kevin).exists() == False
response = client.delete(url, data={'name_id_content': '12345'}, format='json')
assert response.status_code == status_code['failure']
josh = create_user(username='77777')
create_credentials(josh,{
'login':'josh',
'password': 'josh password'
})
assert UserCredentials.objects.filter(user=josh).exists() == True
response = client.delete(url, data={'name_id_content': '77777'}, format='json')
assert response.status_code == status_code['success']
if client.session:
assert UserCredentials.objects.filter(user=josh).exists() == False
@pytest.mark.skipif(hobo == None, reason="hobo is required")
def test_signed_api_delete(client_service, url_signed):
if url_signed.orig == 'testserver' :
status_code = {'success': 200, 'failure': 404}
else:
status_code = {'success': 403, 'failure': 403}
kevin = get_user(first_name='kevin')
assert UserCredentials.objects.filter(user=kevin).exists() == False
response = client_service.delete(url_signed.url, data={'name_id_content': '12345'}, format='json')
assert response.status_code == status_code['failure']
josh = create_user(username='77777')
create_credentials(josh,{
'login':'josh',
'password': 'josh password'
})
assert UserCredentials.objects.filter(user=josh).exists() == True
response = client_service.delete(url_signed.url, data={'name_id_content': '77777'}, format='json')
assert response.status_code == status_code['success']
if url_signed.orig == 'testserver':
assert UserCredentials.objects.filter(user=josh).exists() == False