Provisionning synchrone pour les web-services utilisant l'authentification Publik avec DRF (#59135) #102
|
@ -93,6 +93,8 @@ class KnownServices(FileBaseSettingsLoader):
|
||||||
'secondary': service.get('secondary'),
|
'secondary': service.get('secondary'),
|
||||||
'template_name': service.get('template_name'),
|
'template_name': service.get('template_name'),
|
||||||
}
|
}
|
||||||
|
if 'saml-idp-metadata-url' in service:
|
||||||
|
service_data['saml-idp-metadata-url'] = service['saml-idp-metadata-url']
|
||||||
|
|
||||||
if service.get('secondary') and (
|
if service.get('secondary') and (
|
||||||
service.get('variables') and service.get('variables').get('ou-label')
|
service.get('variables') and service.get('variables').get('ou-label')
|
||||||
|
|
|
@ -16,12 +16,18 @@
|
||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
|
from urllib.parse import quote, urljoin
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from django.conf import settings
|
||||||
|
from django.contrib.auth import get_user_model
|
||||||
from django.contrib.auth.models import Group
|
from django.contrib.auth.models import Group
|
||||||
from django.db import IntegrityError
|
from django.db import IntegrityError
|
||||||
from django.db.models.query import Q
|
from django.db.models import Q
|
||||||
from django.db.transaction import atomic
|
from django.db.transaction import atomic
|
||||||
|
from mellon.models import Issuer
|
||||||
|
|
||||||
|
from hobo import signature
|
||||||
from hobo.agent.common.models import Role, UserExtraAttributes
|
from hobo.agent.common.models import Role, UserExtraAttributes
|
||||||
from hobo.multitenant.utils import provision_user_groups
|
from hobo.multitenant.utils import provision_user_groups
|
||||||
|
|
||||||
|
@ -50,6 +56,66 @@ def user_str(user):
|
||||||
return s
|
return s
|
||||||
|
|
||||||
|
|
||||||
|
def get_idp_service():
|
||||||
|
idp_services = list((settings.KNOWN_SERVICES or {}).get('authentic', {}).values())
|
||||||
|
return (idp_services or [None])[0]
|
||||||
|
|
||||||
|
|
||||||
|
def get_issuer(entity_id):
|
||||||
|
issuer, _ = Issuer.objects.get_or_create(entity_id=entity_id)
|
||||||
|
return issuer
|
||||||
|
|
||||||
|
|
||||||
|
def provision_user(entity_id, o, tries=0):
|
||||||
|
updated = set()
|
||||||
|
attributes = {
|
||||||
|
'first_name': o['first_name'][:30],
|
||||||
|
'last_name': o['last_name'][:150],
|
||||||
|
'email': o['email'][:254],
|
||||||
|
'username': o['uuid'][:150],
|
||||||
|
'is_superuser': o['is_superuser'],
|
||||||
|
'is_staff': o['is_superuser'],
|
||||||
|
'is_active': o.get('is_active', True),
|
||||||
|
}
|
||||||
|
|
||||||
|
excluded_attrs = ['roles', 'password']
|
||||||
|
extra_attributes = {k: v for k, v in o.items() if k not in excluded_attrs}
|
||||||
|
|
||||||
|
User = get_user_model()
|
||||||
|
user = get_user_from_name_id(name_id=o['uuid'], entity_id=entity_id) or User()
|
||||||
|
if user.pk:
|
||||||
|
user_extra_attributes = UserExtraAttributes.objects.get(user=user)
|
||||||
|
else:
|
||||||
|
user_extra_attributes = UserExtraAttributes(user=user, data=extra_attributes)
|
||||||
|
|
||||||
|
for key, value in attributes.items():
|
||||||
|
if getattr(user, key) != value:
|
||||||
|
setattr(user, key, value)
|
||||||
|
updated.add(key)
|
||||||
|
|
||||||
|
for key in extra_attributes:
|
||||||
|
if extra_attributes[key] != user_extra_attributes.data.get(key):
|
||||||
|
updated.add(key)
|
||||||
|
|
||||||
|
if not user.id: # user is new
|
||||||
|
issuer = get_issuer(entity_id)
|
||||||
|
try:
|
||||||
|
with atomic(savepoint=False):
|
||||||
|
user.save()
|
||||||
|
user.saml_identifiers.create(issuer=issuer, name_id=o['uuid'])
|
||||||
|
user_extra_attributes.save()
|
||||||
|
logger.info('provisionned new user %s', user_str(user))
|
||||||
|
except IntegrityError:
|
||||||
|
if tries > 0:
|
||||||
|
raise
|
||||||
|
return provision_user(user, o, tries=tries + 1)
|
||||||
|
elif updated:
|
||||||
|
user.save()
|
||||||
|
user_extra_attributes.save()
|
||||||
|
logger.info('updated user %s(%s)', user_str(user), updated)
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
class NotificationProcessing:
|
class NotificationProcessing:
|
||||||
@classmethod
|
@classmethod
|
||||||
def check_valid_notification(cls, notification):
|
def check_valid_notification(cls, notification):
|
||||||
|
@ -80,71 +146,21 @@ class NotificationProcessing:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def provision_user(cls, issuer, action, data, full=False):
|
def provision_user(cls, issuer, action, data, full=False):
|
||||||
from django.contrib.auth import get_user_model
|
assert not full # provisionning all users is dangerous, we prefer deprovision
|
||||||
from mellon.models import UserSAMLIdentifier
|
|
||||||
from mellon.models_utils import get_issuer
|
|
||||||
|
|
||||||
User = get_user_model()
|
User = get_user_model()
|
||||||
|
|
||||||
assert not full # provisionning all users is dangerous, we prefer deprovision
|
|
||||||
uuids = set()
|
uuids = set()
|
||||||
for o in data:
|
|
||||||
try:
|
|
||||||
with atomic():
|
|
||||||
if action == 'provision':
|
|
||||||
new = False
|
|
||||||
updated = set()
|
|
||||||
attributes = {
|
|
||||||
'first_name': o['first_name'][:30],
|
|
||||||
'last_name': o['last_name'][:150],
|
|
||||||
'email': o['email'][:254],
|
|
||||||
'username': o['uuid'][:150],
|
|
||||||
'is_superuser': o['is_superuser'],
|
|
||||||
'is_staff': o['is_superuser'],
|
|
||||||
'is_active': o.get('is_active', True),
|
|
||||||
}
|
|
||||||
assert cls.check_valid_user(o)
|
|
||||||
try:
|
|
||||||
mellon_user = UserSAMLIdentifier.objects.get(
|
|
||||||
issuer__entity_id=issuer, name_id=o['uuid']
|
|
||||||
)
|
|
||||||
user = mellon_user.user
|
|
||||||
except UserSAMLIdentifier.DoesNotExist:
|
|
||||||
try:
|
|
||||||
user = User.objects.get(
|
|
||||||
Q(username=o['uuid'][:30]) | Q(username=o['uuid'][:150])
|
|
||||||
)
|
|
||||||
except User.DoesNotExist:
|
|
||||||
# temp user object
|
|
||||||
user = User.objects.create(**attributes)
|
|
||||||
new = True
|
|
||||||
saml_issuer = get_issuer(issuer)
|
|
||||||
mellon_user = UserSAMLIdentifier.objects.create(
|
|
||||||
user=user, issuer=saml_issuer, name_id=o['uuid']
|
|
||||||
)
|
|
||||||
excluded_attrs = ['roles', 'password']
|
|
||||||
|
|
||||||
UserExtraAttributes.objects.update_or_create(
|
for o in data:
|
||||||
user=user,
|
if action == 'provision':
|
||||||
defaults={'data': {k: v for k, v in o.items() if k not in excluded_attrs}},
|
assert cls.check_valid_user(o)
|
||||||
)
|
user = provision_user(issuer, o)
|
||||||
if new:
|
role_uuids = [role['uuid'] for role in o.get('roles', [])]
|
||||||
logger.info('provisionned new user %s', user_str(user))
|
provision_user_groups(user, role_uuids)
|
||||||
else:
|
elif action == 'deprovision':
|
||||||
for key, value in attributes.items():
|
assert 'uuid' in o
|
||||||
if getattr(user, key) != value:
|
uuids.add(o['uuid'])
|
||||||
setattr(user, key, value)
|
|
||||||
updated.add(key)
|
|
||||||
if updated:
|
|
||||||
user.save()
|
|
||||||
logger.info('updated user %s(%s)', user_str(user), updated)
|
|
||||||
role_uuids = [role['uuid'] for role in o.get('roles', [])]
|
|
||||||
provision_user_groups(user, role_uuids)
|
|
||||||
elif action == 'deprovision':
|
|
||||||
assert 'uuid' in o
|
|
||||||
uuids.add(o['uuid'])
|
|
||||||
except IntegrityError:
|
|
||||||
raise TryAgain
|
|
||||||
if (full and action == 'provision') or (action == 'deprovision'):
|
if (full and action == 'provision') or (action == 'deprovision'):
|
||||||
if action == 'deprovision':
|
if action == 'deprovision':
|
||||||
qs = User.objects.filter(saml_identifiers__name_id__in=uuids)
|
qs = User.objects.filter(saml_identifiers__name_id__in=uuids)
|
||||||
|
@ -269,3 +285,58 @@ class NotificationProcessing:
|
||||||
except TryAgain:
|
except TryAgain:
|
||||||
continue
|
continue
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_from_name_id(name_id, entity_id=None, raise_on_missing=False):
|
||||||
|
User = get_user_model()
|
||||||
|
try:
|
||||||
|
user = User.objects.get(
|
||||||
|
saml_identifiers__name_id=name_id, saml_identifiers__issuer__entity_id=entity_id
|
||||||
|
)
|
||||||
|
except User.DoesNotExist:
|
||||||
|
try:
|
||||||
|
user = User.objects.get(Q(username=name_id[:30]) | Q(username=name_id[:150]))
|
||||||
|
except User.DoesNotExist:
|
||||||
|
user = None
|
||||||
|
if not user:
|
||||||
|
if raise_on_missing:
|
||||||
|
raise User.DoesNotExist
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
class ProvisionningTemporaryError(RuntimeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def get_or_create_user_from_name_id(name_id, raise_on_missing=False):
|
||||||
|
User = get_user_model()
|
||||||
|
|
||||||
|
user = get_user_from_name_id(name_id=name_id)
|
||||||
|
if user:
|
||||||
|
return user
|
||||||
|
|
||||||
|
idp_service = get_idp_service()
|
||||||
|
if not idp_service:
|
||||||
|
if raise_on_missing:
|
||||||
|
raise User.DoesNotExist('no idp service defined')
|
||||||
|
return None
|
||||||
|
|
||||||
|
entity_id = idp_service['saml-idp-metadata-url']
|
||||||
|
issuer = get_issuer(entity_id)
|
||||||
|
users_api_url = urljoin(idp_service['url'], 'api/users/%s/' % quote(name_id, safe=''))
|
||||||
|
try:
|
||||||
|
response = requests.get(signature.sign_url(users_api_url, idp_service['secret']), timeout=5)
|
||||||
|
response.raise_for_status()
|
||||||
|
except requests.HTTPError as e:
|
||||||
|
if e.response.status_code == 404:
|
||||||
|
if raise_on_missing:
|
||||||
|
raise User.DoesNotExist
|
||||||
|
return None
|
||||||
|
if raise_on_missing:
|
||||||
|
raise ProvisionningTemporaryError(str(e) or repr(e))
|
||||||
|
return None
|
||||||
|
except requests.RequestException as e:
|
||||||
|
if raise_on_missing:
|
||||||
|
raise ProvisionningTemporaryError(str(e) or repr(e))
|
||||||
|
return None
|
||||||
|
return provision_user(issuer.entity_id, o=response.json())
|
||||||
|
|
|
@ -9,6 +9,7 @@ from django.utils.module_loading import import_string
|
||||||
from rest_framework import authentication, exceptions, status
|
from rest_framework import authentication, exceptions, status
|
||||||
|
|
||||||
from hobo import signature
|
from hobo import signature
|
||||||
|
from hobo.provisionning.utils import ProvisionningTemporaryError, get_or_create_user_from_name_id
|
||||||
from hobo.requests_wrapper import Requests
|
from hobo.requests_wrapper import Requests
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -82,10 +83,15 @@ class APIClientUser:
|
||||||
|
|
||||||
class PublikAuthenticationFailed(exceptions.APIException):
|
class PublikAuthenticationFailed(exceptions.APIException):
|
||||||
status_code = status.HTTP_401_UNAUTHORIZED
|
status_code = status.HTTP_401_UNAUTHORIZED
|
||||||
default_code = 'invalid-signature'
|
|
||||||
|
|
||||||
def __init__(self, code):
|
def __init__(self, code, description=None):
|
||||||
self.detail = {'err': 1, 'err_desc': code}
|
self.detail = {'err': 1, 'err_class': code, 'err_desc': code}
|
||||||
|
if description:
|
||||||
|
self.detail['err_desc'] = description
|
||||||
|
|
||||||
|
|
||||||
|
class PublikAuthenticationTemporaryFailure(PublikAuthenticationFailed):
|
||||||
|
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
|
||||||
|
|
||||||
|
|
||||||
class PublikAuthentication(authentication.BaseAuthentication):
|
class PublikAuthentication(authentication.BaseAuthentication):
|
||||||
|
@ -111,8 +117,10 @@ class PublikAuthentication(authentication.BaseAuthentication):
|
||||||
|
|
||||||
elif UserSAMLIdentifier:
|
elif UserSAMLIdentifier:
|
||||||
try:
|
try:
|
||||||
return UserSAMLIdentifier.objects.get(name_id=name_id).user
|
return get_or_create_user_from_name_id(name_id, raise_on_missing=True)
|
||||||
except UserSAMLIdentifier.DoesNotExist:
|
except ProvisionningTemporaryError as e:
|
||||||
|
raise PublikAuthenticationTemporaryFailure('idp-not-reachable', str(e))
|
||||||
|
except User.DoesNotExist:
|
||||||
raise PublikAuthenticationFailed('user-not-found')
|
raise PublikAuthenticationFailed('user-not-found')
|
||||||
else:
|
else:
|
||||||
raise PublikAuthenticationFailed('no-usable-model')
|
raise PublikAuthenticationFailed('no-usable-model')
|
||||||
|
@ -124,7 +132,6 @@ class PublikAuthentication(authentication.BaseAuthentication):
|
||||||
pass
|
pass
|
||||||
if hasattr(settings, 'HOBO_ANONYMOUS_SERVICE_USER_CLASS'):
|
if hasattr(settings, 'HOBO_ANONYMOUS_SERVICE_USER_CLASS'):
|
||||||
klass = import_string(settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS)
|
klass = import_string(settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS)
|
||||||
self.logger.info('anonymous signature validated')
|
|
||||||
return klass()
|
return klass()
|
||||||
raise PublikAuthenticationFailed('no-user-for-orig')
|
raise PublikAuthenticationFailed('no-user-for-orig')
|
||||||
|
|
||||||
|
@ -150,9 +157,8 @@ class PublikAuthentication(authentication.BaseAuthentication):
|
||||||
), 'signature.check_url should never return False with raise_on_error'
|
), 'signature.check_url should never return False with raise_on_error'
|
||||||
except signature.SignatureError as e:
|
except signature.SignatureError as e:
|
||||||
self.logger.warning('publik rest-framework-authentication failed: %s', e)
|
self.logger.warning('publik rest-framework-authentication failed: %s', e)
|
||||||
raise PublikAuthenticationFailed(str(e))
|
raise PublikAuthenticationFailed('invalid-signature', str(e))
|
||||||
user = self.resolve_user(request)
|
user = self.resolve_user(request)
|
||||||
self.logger.info('user authenticated with signature %s', user)
|
|
||||||
return (user, None)
|
return (user, None)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -144,7 +144,8 @@ def test_response(rf, settings, tenant):
|
||||||
|
|
||||||
response = view(request)
|
response = view(request)
|
||||||
assert response.status_code == 401
|
assert response.status_code == 401
|
||||||
assert response.data == {'err': 1, 'err_desc': 'user-not-found'}
|
assert response.data['err'] == 1
|
||||||
|
assert response.data['err_desc'] == 'user-not-found'
|
||||||
|
|
||||||
# Service authentication, wrong timestamp
|
# Service authentication, wrong timestamp
|
||||||
request = rf.get(
|
request = rf.get(
|
||||||
|
@ -153,10 +154,11 @@ def test_response(rf, settings, tenant):
|
||||||
|
|
||||||
response = view(request)
|
response = view(request)
|
||||||
assert response.status_code == 401
|
assert response.status_code == 401
|
||||||
assert response.data == {
|
assert response.data['err'] == 1
|
||||||
'err': 1,
|
assert (
|
||||||
'err_desc': "invalid timestamp, time data 'xxx' does not match format '%Y-%m-%dT%H:%M:%SZ'",
|
response.data['err_desc']
|
||||||
}
|
== "invalid timestamp, time data 'xxx' does not match format '%Y-%m-%dT%H:%M:%SZ'"
|
||||||
|
)
|
||||||
|
|
||||||
# Service authentication
|
# Service authentication
|
||||||
request = rf.get(signature.sign_url('/?orig=zzz', secret_key))
|
request = rf.get(signature.sign_url('/?orig=zzz', secret_key))
|
||||||
|
@ -168,4 +170,5 @@ def test_response(rf, settings, tenant):
|
||||||
del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS
|
del settings.HOBO_ANONYMOUS_SERVICE_USER_CLASS
|
||||||
response = view(request)
|
response = view(request)
|
||||||
assert response.status_code == 401
|
assert response.status_code == 401
|
||||||
assert response.data == {'err': 1, 'err_desc': 'no-user-for-orig'}
|
assert response.data['err'] == 1
|
||||||
|
assert response.data['err_desc'] == 'no-user-for-orig'
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
from tenant_schemas.utils import tenant_context
|
||||||
|
|
||||||
from hobo.multitenant.apps import clear_tenants_settings
|
from hobo.multitenant.apps import clear_tenants_settings
|
||||||
|
|
||||||
|
@ -70,6 +71,7 @@ def make_tenant(tmp_path, transactional_db, settings, request):
|
||||||
'service-id': 'authentic',
|
'service-id': 'authentic',
|
||||||
'base_url': 'http://other.example.net',
|
'base_url': 'http://other.example.net',
|
||||||
'legacy_urls': [{'base_url': 'http://olda2.example.net'}],
|
'legacy_urls': [{'base_url': 'http://olda2.example.net'}],
|
||||||
|
'saml-idp-metadata-url': 'https://other.example.net/idp/saml2/metadata',
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
'slug': 'another',
|
'slug': 'another',
|
||||||
|
@ -84,6 +86,7 @@ def make_tenant(tmp_path, transactional_db, settings, request):
|
||||||
'service-id': 'combo',
|
'service-id': 'combo',
|
||||||
'template_name': '...portal-user...',
|
'template_name': '...portal-user...',
|
||||||
'base_url': 'http://portal-user.example.net',
|
'base_url': 'http://portal-user.example.net',
|
||||||
|
'secret_key': 'abcdefg',
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
|
@ -120,7 +123,10 @@ def tenants(make_tenant):
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def tenant(make_tenant):
|
def tenant(make_tenant):
|
||||||
clear_tenants_settings()
|
clear_tenants_settings()
|
||||||
return make_tenant('tenant.example.net')
|
tenant = make_tenant('tenant.example.net')
|
||||||
|
|
||||||
|
with tenant_context(tenant):
|
||||||
|
yield tenant
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
|
|
@ -1,4 +1,20 @@
|
||||||
from hobo.provisionning.utils import NotificationProcessing
|
import json
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import requests
|
||||||
|
from httmock import HTTMock, urlmatch
|
||||||
|
from mellon.models import Issuer
|
||||||
|
from rest_framework import permissions
|
||||||
|
from rest_framework.response import Response
|
||||||
|
from rest_framework.views import APIView
|
||||||
|
|
||||||
|
from hobo import rest_authentication, signature
|
||||||
|
from hobo.provisionning.utils import (
|
||||||
|
NotificationProcessing,
|
||||||
|
ProvisionningTemporaryError,
|
||||||
|
get_or_create_user_from_name_id,
|
||||||
|
get_user_from_name_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_truncate_role_name():
|
def test_truncate_role_name():
|
||||||
|
@ -20,3 +36,105 @@ def test_truncate_role_name():
|
||||||
assert len(truncated) == max_length
|
assert len(truncated) == max_length
|
||||||
assert truncated not in seen
|
assert truncated not in seen
|
||||||
seen.add(truncated)
|
seen.add(truncated)
|
||||||
|
|
||||||
|
|
||||||
|
@urlmatch()
|
||||||
|
def request_exception(url, request):
|
||||||
|
raise requests.ConnectionError
|
||||||
|
|
||||||
|
|
||||||
|
NAME_ID = '1234' * 8
|
||||||
|
|
||||||
|
|
||||||
|
@urlmatch(path='/api/users/')
|
||||||
|
def user_payload(url, request):
|
||||||
|
return {
|
||||||
|
'status_code': 200,
|
||||||
|
'content': json.dumps(
|
||||||
|
{
|
||||||
|
'uuid': NAME_ID,
|
||||||
|
'first_name': 'John',
|
||||||
|
'last_name': 'Doe',
|
||||||
|
'email': 'john.doe@example.net',
|
||||||
|
'is_superuser': False,
|
||||||
|
'is_active': True,
|
||||||
|
}
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@urlmatch(path='/api/users/')
|
||||||
|
def error_404(url, request):
|
||||||
|
return {
|
||||||
|
'status_code': 404,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_or_create_user_from_name_id(tenant, django_user_model):
|
||||||
|
Issuer.objects.create(entity_id='https://idp.examle.net/idp/saml2/metadata')
|
||||||
|
|
||||||
|
assert get_user_from_name_id(NAME_ID) is None
|
||||||
|
with pytest.raises(django_user_model.DoesNotExist):
|
||||||
|
get_user_from_name_id(NAME_ID, raise_on_missing=True)
|
||||||
|
|
||||||
|
with HTTMock(request_exception):
|
||||||
|
assert get_or_create_user_from_name_id(NAME_ID) is None
|
||||||
|
with pytest.raises(ProvisionningTemporaryError):
|
||||||
|
get_or_create_user_from_name_id(NAME_ID, raise_on_missing=True)
|
||||||
|
|
||||||
|
with HTTMock(error_404):
|
||||||
|
assert get_or_create_user_from_name_id(NAME_ID) is None
|
||||||
|
with pytest.raises(django_user_model.DoesNotExist):
|
||||||
|
get_or_create_user_from_name_id(NAME_ID, raise_on_missing=True)
|
||||||
|
|
||||||
|
with HTTMock(user_payload):
|
||||||
|
user = get_or_create_user_from_name_id(NAME_ID)
|
||||||
|
assert user is not None
|
||||||
|
assert user.first_name == 'John'
|
||||||
|
assert user.username == NAME_ID
|
||||||
|
assert user.saml_identifiers.get().name_id == NAME_ID
|
||||||
|
assert django_user_model.objects.count() == 1
|
||||||
|
|
||||||
|
|
||||||
|
class DummyAPIView(APIView):
|
||||||
|
authentication_classes = (rest_authentication.PublikAuthentication,)
|
||||||
|
permission_classes = (permissions.IsAuthenticated,)
|
||||||
|
|
||||||
|
def get(self, request, format=None):
|
||||||
|
return Response({'err': 0})
|
||||||
|
|
||||||
|
|
||||||
|
def test_rest_authentication_provisionning_ok(tenant, settings, rf, django_user_model):
|
||||||
|
key = settings.KNOWN_SERVICES['combo']['another2']['secret']
|
||||||
|
request = rf.get(signature.sign_url('/api/misc/?param1=2&NameID=%s&orig=portal-user.example.net', key))
|
||||||
|
|
||||||
|
view = DummyAPIView.as_view()
|
||||||
|
|
||||||
|
with HTTMock(user_payload):
|
||||||
|
response = view(request)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.data == {'err': 0}
|
||||||
|
user = django_user_model.objects.get()
|
||||||
|
assert user.first_name == 'John'
|
||||||
|
assert user.username == NAME_ID
|
||||||
|
assert user.saml_identifiers.get().name_id == NAME_ID
|
||||||
|
|
||||||
|
|
||||||
|
def test_rest_authentication_provisionning_nok(tenant, settings, rf):
|
||||||
|
key = settings.KNOWN_SERVICES['combo']['another2']['secret']
|
||||||
|
request = rf.get(signature.sign_url('/api/misc/?param1=2&NameID=abcd&orig=portal-user.example.net', key))
|
||||||
|
|
||||||
|
view = DummyAPIView.as_view()
|
||||||
|
|
||||||
|
with HTTMock(request_exception):
|
||||||
|
response = view(request)
|
||||||
|
assert response.status_code == 500
|
||||||
|
assert response.data['err'] == 1
|
||||||
|
assert response.data['err_class'] == 'idp-not-reachable'
|
||||||
|
assert 'ConnectionError' in response.data['err_desc']
|
||||||
|
|
||||||
|
with HTTMock(error_404):
|
||||||
|
response = view(request)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert response.data['err'] == 1
|
||||||
|
assert response.data['err_class'] == 'user-not-found'
|
||||||
|
|
|
@ -194,6 +194,7 @@ def test_known_services(tenants, settings):
|
||||||
'saml-sp-metadata-url',
|
'saml-sp-metadata-url',
|
||||||
'provisionning-url',
|
'provisionning-url',
|
||||||
'secondary',
|
'secondary',
|
||||||
|
'saml-idp-metadata-url',
|
||||||
} == authentic_other_keys
|
} == authentic_other_keys
|
||||||
assert (
|
assert (
|
||||||
settings.KNOWN_SERVICES['authentic']['other']['url'] == hobo_json['services'][2]['base_url']
|
settings.KNOWN_SERVICES['authentic']['other']['url'] == hobo_json['services'][2]['base_url']
|
||||||
|
|
|
@ -44,14 +44,11 @@ def test_mocked_uwsgi(uwsgi):
|
||||||
|
|
||||||
|
|
||||||
def test_mocked_uwsgi_tenant(uwsgi, tenant):
|
def test_mocked_uwsgi_tenant(uwsgi, tenant):
|
||||||
from tenant_schemas.utils import tenant_context
|
|
||||||
|
|
||||||
@hobo.multitenant.uwsgidecorators.spool
|
@hobo.multitenant.uwsgidecorators.spool
|
||||||
def function(a, b):
|
def function(a, b):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
with tenant_context(tenant):
|
function.spool(1, 2)
|
||||||
function.spool(1, 2)
|
|
||||||
|
|
||||||
assert set(uwsgi.spool.call_args[1].keys()) == {'body', 'tenant', 'name'}
|
assert set(uwsgi.spool.call_args[1].keys()) == {'body', 'tenant', 'name'}
|
||||||
assert pickle.loads(uwsgi.spool.call_args[1]['body']) == {'args': (1, 2), 'kwargs': {}}
|
assert pickle.loads(uwsgi.spool.call_args[1]['body']) == {'args': (1, 2), 'kwargs': {}}
|
||||||
|
|
Loading…
Reference in New Issue