359 lines
15 KiB
Python
359 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import pytest
|
|
import logging
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
def test_hobo_notify_roles(caplog, tenants):
|
|
from hobo.agent.common.management.commands.hobo_notify import Command
|
|
from tenant_schemas.utils import tenant_context
|
|
from django.contrib.auth.models import Group
|
|
from hobo.agent.common.models import Role
|
|
|
|
# test wrong audience
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'audience': [u'http://coin.com/saml/metadata'],
|
|
u'objects': {
|
|
u'@type': 'role',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'12345',
|
|
u'name': u'Service petite enfance',
|
|
u'slug': u'service-petite-enfance',
|
|
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 0
|
|
assert Role.objects.count() == 0
|
|
|
|
# test provision
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'role',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'12345',
|
|
u'name': u'Service petite enfance',
|
|
u'slug': u'service-petite-enfance',
|
|
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
role = Role.objects.get()
|
|
assert role.uuid == u'12345'
|
|
assert role.name == u'Service petite enfance'
|
|
assert role.description == u'Role du service petite enfance %s' % tenant.domain_url
|
|
|
|
# test full provisionning
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'full': True,
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'role',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'xyz',
|
|
u'name': u'Service état civil',
|
|
u'slug': u'service-etat-civil',
|
|
u'description': u'Role du service état civil %s' % tenant.domain_url,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
role = Role.objects.get()
|
|
assert role.uuid == u'xyz'
|
|
assert role.name == u'Service état civil'
|
|
assert role.description == u'Role du service état civil %s' % tenant.domain_url
|
|
|
|
# test deprovision
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
u'@type': u'deprovision',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'role',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'xyz',
|
|
u'name': u'Service état civil',
|
|
u'slug': u'service-etat-civil',
|
|
u'description': u'Role du service état civil %s' % tenant.domain_url,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 0
|
|
assert Role.objects.count() == 0
|
|
|
|
# test collisions
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'role',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'12345',
|
|
u'name': u'Service petite enfance',
|
|
u'slug': u'service-petite-enfance',
|
|
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
# test update by uuid
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'role',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'12345',
|
|
u'name': u'Service petite enfance2',
|
|
u'slug': u'service-petite-enfance2',
|
|
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
assert Role.objects.get().name == notification['objects']['data'][0]['name']
|
|
# test uuid change
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'role',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'xyz',
|
|
u'name': u'Service petite enfance2',
|
|
u'slug': u'service-petite-enfance2',
|
|
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
assert Role.objects.get().uuid == notification['objects']['data'][0]['uuid']
|
|
# test error on collision
|
|
Role.objects.create(uuid='12345', name='Foo', description='foo')
|
|
assert Role.objects.count() == 2
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'role',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'12345',
|
|
u'name': u'Service petite enfance2',
|
|
u'slug': u'service-petite-enfance2',
|
|
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 2
|
|
assert Role.objects.count() == 2
|
|
assert Role.objects.filter(uuid='12345', name=u'Service petite enfance').count() == 0
|
|
assert caplog.records[-1].levelno == logging.ERROR
|
|
assert caplog.records[-1].msg.startswith('cannot provision')
|
|
assert caplog.records[-1].args == (u'Service petite enfance2', u'12345')
|
|
|
|
|
|
def test_provision_users(tenants):
|
|
from hobo.agent.common.management.commands.hobo_notify import Command
|
|
from tenant_schemas.utils import tenant_context
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.models import Group
|
|
from hobo.agent.common.models import Role
|
|
|
|
User = get_user_model()
|
|
|
|
# provision a role
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'role',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'12345',
|
|
u'name': u'Service petite enfance',
|
|
u'slug': u'service-petite-enfance',
|
|
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
role = Role.objects.get()
|
|
assert role.uuid == u'12345'
|
|
assert role.name == u'Service petite enfance'
|
|
assert role.description == u'Role du service petite enfance %s' % tenant.domain_url
|
|
|
|
# test user provisionning
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'issuer': 'http://idp.example.net/idp/saml/metadata',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'user',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'a' * 32,
|
|
u'first_name': u'John',
|
|
u'last_name': u'Doe',
|
|
u'email': u'john.doe@example.net',
|
|
u'is_superuser': False,
|
|
u'roles': [
|
|
{
|
|
u'uuid': u'12345',
|
|
u'name': u'Service petite enfance',
|
|
u'description': u'etc.',
|
|
},
|
|
{
|
|
u'uuid': u'xyz',
|
|
u'name': u'Service état civil',
|
|
u'description': u'etc.',
|
|
},
|
|
],
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert User.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
assert Group.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert user.username == 'a' * 30
|
|
assert user.first_name == 'John'
|
|
assert user.last_name == 'Doe'
|
|
assert user.email == 'john.doe@example.net'
|
|
assert user.is_superuser is False
|
|
assert user.is_staff is False
|
|
assert user.saml_identifiers.count() == 1
|
|
usi = user.saml_identifiers.get()
|
|
assert usi.issuer == 'http://idp.example.net/idp/saml/metadata'
|
|
assert usi.name_id == 'a' * 32
|
|
assert user.groups.count() == 1
|
|
group = user.groups.get()
|
|
assert group.name == 'Service petite enfance'
|
|
role = Role.objects.get(group_ptr=group.pk)
|
|
assert role.uuid == '12345'
|
|
|
|
# test nothing change if run a second time
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
u'@type': u'provision',
|
|
u'issuer': 'http://idp.example.net/idp/saml/metadata',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'user',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'a' * 32,
|
|
u'first_name': u'John',
|
|
u'last_name': u'Doe',
|
|
u'email': u'john.doe@example.net',
|
|
u'is_superuser': True,
|
|
u'roles': [
|
|
{
|
|
u'uuid': u'12345',
|
|
u'name': u'Service petite enfance',
|
|
u'description': u'etc.',
|
|
},
|
|
{
|
|
u'uuid': u'xyz',
|
|
u'name': u'Service état civil',
|
|
u'description': u'etc.',
|
|
},
|
|
],
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert User.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
assert Group.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert user.username == 'a' * 30
|
|
assert user.first_name == 'John'
|
|
assert user.last_name == 'Doe'
|
|
assert user.email == 'john.doe@example.net'
|
|
assert user.is_superuser is True
|
|
assert user.is_staff is True
|
|
assert user.saml_identifiers.count() == 1
|
|
usi = user.saml_identifiers.get()
|
|
assert usi.issuer == 'http://idp.example.net/idp/saml/metadata'
|
|
assert usi.name_id == 'a' * 32
|
|
assert user.groups.count() == 1
|
|
group = user.groups.get()
|
|
assert group.name == 'Service petite enfance'
|
|
role = Role.objects.get(group_ptr=group.pk)
|
|
assert role.uuid == '12345'
|
|
|
|
# test deprovision works
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
u'@type': u'deprovision',
|
|
u'issuer': 'http://idp.example.net/idp/saml/metadata',
|
|
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
|
|
u'objects': {
|
|
u'@type': 'user',
|
|
u'data': [
|
|
{
|
|
u'uuid': u'a' * 32,
|
|
}
|
|
]
|
|
}
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert User.objects.count() == 0
|