506 lines
21 KiB
Python
506 lines
21 KiB
Python
import logging
|
|
|
|
import pytest
|
|
from django.db import connection
|
|
from django.test.utils import CaptureQueriesContext
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
def test_hobo_notify_roles(caplog, tenants):
|
|
from django.contrib.auth.models import Group
|
|
from tenant_schemas.utils import tenant_context
|
|
|
|
from hobo.agent.common.management.commands.hobo_notify import Command
|
|
from hobo.agent.common.models import Role
|
|
|
|
# test wrong audience
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'audience': ['http://coin.com/saml/metadata'],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'slug': 'service-petite-enfance',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 0
|
|
assert Role.objects.count() == 0
|
|
|
|
# test provision
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'slug': 'service-petite-enfance',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
'details': 'Some details',
|
|
'emails': ['foo@bar.com', 'test@entrouvert.org'],
|
|
'emails_to_members': False,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
role = Role.objects.get()
|
|
assert role.uuid == '12345'
|
|
assert role.name == 'Service petite enfance'
|
|
assert role.slug == 'service-petite-enfance'
|
|
assert role.description == 'Role du service petite enfance %s' % tenant.domain_url
|
|
assert role.details == 'Some details'
|
|
assert role.emails == ['foo@bar.com', 'test@entrouvert.org']
|
|
assert role.emails_to_members is False
|
|
|
|
# test full provisionning
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'full': True,
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': 'xyz',
|
|
'name': 'Service état civil',
|
|
'slug': 'service-etat-civil',
|
|
'description': 'Role du service état civil %s' % tenant.domain_url,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
role = Role.objects.get()
|
|
assert role.uuid == 'xyz'
|
|
assert role.name == 'Service état civil'
|
|
assert role.description == 'Role du service état civil %s' % tenant.domain_url
|
|
|
|
# test deprovision
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'deprovision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': 'xyz',
|
|
'name': 'Service état civil',
|
|
'slug': 'service-etat-civil',
|
|
'description': 'Role du service état civil %s' % tenant.domain_url,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 0
|
|
assert Role.objects.count() == 0
|
|
|
|
# test collisions
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'slug': 'service-petite-enfance',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
# test update by uuid
|
|
notification = {
|
|
'@type': 'provision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance2',
|
|
'slug': 'service-petite-enfance2',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
assert Role.objects.get().name == notification['objects']['data'][0]['name']
|
|
# test uuid change
|
|
notification = {
|
|
'@type': 'provision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': 'xyz',
|
|
'name': 'Service petite enfance2',
|
|
'slug': 'service-petite-enfance2',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
assert Role.objects.get().uuid == notification['objects']['data'][0]['uuid']
|
|
# test error on collision
|
|
Role.objects.create(uuid='12345', name='Foo', description='foo')
|
|
assert Role.objects.count() == 2
|
|
notification = {
|
|
'@type': 'provision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance2',
|
|
'slug': 'service-petite-enfance2',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 2
|
|
assert Role.objects.count() == 2
|
|
assert Role.objects.filter(uuid='12345', name='Service petite enfance').count() == 0
|
|
assert caplog.records[-1].levelno == logging.ERROR
|
|
assert caplog.records[-1].msg.startswith('cannot provision')
|
|
assert caplog.records[-1].args == ('Service petite enfance2', '12345')
|
|
|
|
|
|
def test_hobo_notify_roles_db_queries(caplog, tenants):
|
|
from django.contrib.auth.models import Group
|
|
from tenant_schemas.utils import tenant_context
|
|
|
|
from hobo.agent.common.management.commands.hobo_notify import Command
|
|
from hobo.agent.common.models import Role
|
|
|
|
# test provision
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'slug': 'service-petite-enfance',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
'details': 'Some details',
|
|
'emails': ['foo@bar.com', 'test@entrouvert.org'],
|
|
'emails_to_members': False,
|
|
},
|
|
{
|
|
'uuid': '6789',
|
|
'name': 'Autre service',
|
|
'slug': 'autre service',
|
|
'description': "Role d'un autre service petite enfance %s" % tenant.domain_url,
|
|
},
|
|
],
|
|
},
|
|
}
|
|
with CaptureQueriesContext(connection) as ctx:
|
|
Command.process_notification(tenant, notification)
|
|
assert len(ctx.captured_queries) == 33
|
|
assert Group.objects.count() == 2
|
|
assert Role.objects.count() == 2
|
|
|
|
# test provision full
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'full': True,
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12sed45',
|
|
'name': 'Le dernier service',
|
|
'slug': 'le-dernier-service',
|
|
'description': '',
|
|
}
|
|
],
|
|
},
|
|
}
|
|
with CaptureQueriesContext(connection) as ctx:
|
|
Command.process_notification(tenant, notification)
|
|
assert len(ctx.captured_queries) == 36
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
|
|
# provision again
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'slug': 'service-petite-enfance',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
'details': 'Some details',
|
|
'emails': ['foo@bar.com', 'test@entrouvert.org'],
|
|
'emails_to_members': False,
|
|
},
|
|
{
|
|
'uuid': '6789',
|
|
'name': 'Autre service',
|
|
'slug': 'autre service',
|
|
'description': "Role d'un autre service petite enfance %s" % tenant.domain_url,
|
|
},
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 3
|
|
assert Role.objects.count() == 3
|
|
|
|
# test deprovision
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'deprovision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12sed45',
|
|
'name': 'Le dernier service',
|
|
'slug': 'le-dernier-service',
|
|
'description': '',
|
|
},
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'slug': 'service-petite-enfance',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
'details': 'Some details',
|
|
'emails': ['foo@bar.com', 'test@entrouvert.org'],
|
|
'emails_to_members': False,
|
|
},
|
|
{
|
|
'uuid': '6789',
|
|
'name': 'Autre service',
|
|
'slug': 'autre service',
|
|
'description': "Role d'un autre service petite enfance %s" % tenant.domain_url,
|
|
},
|
|
],
|
|
},
|
|
}
|
|
with CaptureQueriesContext(connection) as ctx:
|
|
Command.process_notification(tenant, notification)
|
|
assert len(ctx.captured_queries) == 18
|
|
|
|
assert Group.objects.count() == 0
|
|
assert Role.objects.count() == 0
|
|
|
|
|
|
def test_provision_users(tenants):
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.models import Group
|
|
from tenant_schemas.utils import tenant_context
|
|
|
|
from hobo.agent.common.management.commands.hobo_notify import Command
|
|
from hobo.agent.common.models import Role
|
|
|
|
User = get_user_model()
|
|
|
|
# provision a role
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'role',
|
|
'data': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'slug': 'service-petite-enfance',
|
|
'description': 'Role du service petite enfance %s' % tenant.domain_url,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert Group.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
role = Role.objects.get()
|
|
assert role.uuid == '12345'
|
|
assert role.name == 'Service petite enfance'
|
|
assert role.description == 'Role du service petite enfance %s' % tenant.domain_url
|
|
|
|
# test user provisionning
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'issuer': 'http://idp.example.net/idp/saml/metadata',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'user',
|
|
'data': [
|
|
{
|
|
'uuid': 'a' * 32,
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'email': 'john.doe@example.net',
|
|
'is_superuser': False,
|
|
'roles': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'description': 'etc.',
|
|
},
|
|
{
|
|
'uuid': 'xyz',
|
|
'name': 'Service état civil',
|
|
'description': 'etc.',
|
|
},
|
|
],
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert User.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
assert Group.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert user.username == 'a' * 32
|
|
assert user.first_name == 'John'
|
|
assert user.last_name == 'Doe'
|
|
assert user.email == 'john.doe@example.net'
|
|
assert user.is_superuser is False
|
|
assert user.is_staff is False
|
|
assert user.saml_identifiers.count() == 1
|
|
usi = user.saml_identifiers.get()
|
|
assert usi.issuer.entity_id == 'http://idp.example.net/idp/saml/metadata'
|
|
assert usi.name_id == 'a' * 32
|
|
assert user.groups.count() == 1
|
|
group = user.groups.get()
|
|
assert group.name == 'Service petite enfance'
|
|
role = Role.objects.get(group_ptr=group.pk)
|
|
assert role.uuid == '12345'
|
|
|
|
# test nothing change if run a second time
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'provision',
|
|
'issuer': 'http://idp.example.net/idp/saml/metadata',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'user',
|
|
'data': [
|
|
{
|
|
'uuid': 'a' * 32,
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'email': 'john.doe@example.net',
|
|
'is_superuser': True,
|
|
'roles': [
|
|
{
|
|
'uuid': '12345',
|
|
'name': 'Service petite enfance',
|
|
'description': 'etc.',
|
|
},
|
|
{
|
|
'uuid': 'xyz',
|
|
'name': 'Service état civil',
|
|
'description': 'etc.',
|
|
},
|
|
],
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert User.objects.count() == 1
|
|
assert Role.objects.count() == 1
|
|
assert Group.objects.count() == 1
|
|
user = User.objects.get()
|
|
assert user.username == 'a' * 32
|
|
assert user.first_name == 'John'
|
|
assert user.last_name == 'Doe'
|
|
assert user.email == 'john.doe@example.net'
|
|
assert user.is_superuser is True
|
|
assert user.is_staff is True
|
|
assert user.saml_identifiers.count() == 1
|
|
usi = user.saml_identifiers.get()
|
|
assert usi.issuer.entity_id == 'http://idp.example.net/idp/saml/metadata'
|
|
assert usi.name_id == 'a' * 32
|
|
assert user.groups.count() == 1
|
|
group = user.groups.get()
|
|
assert group.name == 'Service petite enfance'
|
|
role = Role.objects.get(group_ptr=group.pk)
|
|
assert role.uuid == '12345'
|
|
|
|
# test deprovision works
|
|
for tenant in tenants:
|
|
with tenant_context(tenant):
|
|
notification = {
|
|
'@type': 'deprovision',
|
|
'issuer': 'http://idp.example.net/idp/saml/metadata',
|
|
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
|
|
'objects': {
|
|
'@type': 'user',
|
|
'data': [
|
|
{
|
|
'uuid': 'a' * 32,
|
|
}
|
|
],
|
|
},
|
|
}
|
|
Command.process_notification(tenant, notification)
|
|
assert User.objects.count() == 0
|