hobo/tests_multitenant/test_hobo_notify.py

506 lines
21 KiB
Python

import logging
import pytest
from django.db import connection
from django.test.utils import CaptureQueriesContext
pytestmark = pytest.mark.django_db
def test_hobo_notify_roles(caplog, tenants):
from django.contrib.auth.models import Group
from tenant_schemas.utils import tenant_context
from hobo.agent.common.management.commands.hobo_notify import Command
from hobo.agent.common.models import Role
# test wrong audience
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'audience': ['http://coin.com/saml/metadata'],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 0
assert Role.objects.count() == 0
# test provision
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
'details': 'Some details',
'emails': ['foo@bar.com', 'test@entrouvert.org'],
'emails_to_members': False,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
role = Role.objects.get()
assert role.uuid == '12345'
assert role.name == 'Service petite enfance'
assert role.slug == 'service-petite-enfance'
assert role.description == 'Role du service petite enfance %s' % tenant.domain_url
assert role.details == 'Some details'
assert role.emails == ['foo@bar.com', 'test@entrouvert.org']
assert role.emails_to_members is False
# test full provisionning
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'full': True,
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': 'xyz',
'name': 'Service état civil',
'slug': 'service-etat-civil',
'description': 'Role du service état civil %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
role = Role.objects.get()
assert role.uuid == 'xyz'
assert role.name == 'Service état civil'
assert role.description == 'Role du service état civil %s' % tenant.domain_url
# test deprovision
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'deprovision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': 'xyz',
'name': 'Service état civil',
'slug': 'service-etat-civil',
'description': 'Role du service état civil %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 0
assert Role.objects.count() == 0
# test collisions
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
# test update by uuid
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance2',
'slug': 'service-petite-enfance2',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
assert Role.objects.get().name == notification['objects']['data'][0]['name']
# test uuid change
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': 'xyz',
'name': 'Service petite enfance2',
'slug': 'service-petite-enfance2',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
assert Role.objects.get().uuid == notification['objects']['data'][0]['uuid']
# test error on collision
Role.objects.create(uuid='12345', name='Foo', description='foo')
assert Role.objects.count() == 2
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance2',
'slug': 'service-petite-enfance2',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 2
assert Role.objects.count() == 2
assert Role.objects.filter(uuid='12345', name='Service petite enfance').count() == 0
assert caplog.records[-1].levelno == logging.ERROR
assert caplog.records[-1].msg.startswith('cannot provision')
assert caplog.records[-1].args == ('Service petite enfance2', '12345')
def test_hobo_notify_roles_db_queries(caplog, tenants):
from django.contrib.auth.models import Group
from tenant_schemas.utils import tenant_context
from hobo.agent.common.management.commands.hobo_notify import Command
from hobo.agent.common.models import Role
# test provision
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
'details': 'Some details',
'emails': ['foo@bar.com', 'test@entrouvert.org'],
'emails_to_members': False,
},
{
'uuid': '6789',
'name': 'Autre service',
'slug': 'autre service',
'description': "Role d'un autre service petite enfance %s" % tenant.domain_url,
},
],
},
}
with CaptureQueriesContext(connection) as ctx:
Command.process_notification(tenant, notification)
assert len(ctx.captured_queries) == 13
assert Group.objects.count() == 2
assert Role.objects.count() == 2
# test provision full
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'full': True,
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12sed45',
'name': 'Le dernier service',
'slug': 'le-dernier-service',
'description': '',
}
],
},
}
with CaptureQueriesContext(connection) as ctx:
Command.process_notification(tenant, notification)
assert len(ctx.captured_queries) == 14
assert Group.objects.count() == 1
assert Role.objects.count() == 1
# provision again
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
'details': 'Some details',
'emails': ['foo@bar.com', 'test@entrouvert.org'],
'emails_to_members': False,
},
{
'uuid': '6789',
'name': 'Autre service',
'slug': 'autre service',
'description': "Role d'un autre service petite enfance %s" % tenant.domain_url,
},
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 3
assert Role.objects.count() == 3
# test deprovision
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'deprovision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12sed45',
'name': 'Le dernier service',
'slug': 'le-dernier-service',
'description': '',
},
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
'details': 'Some details',
'emails': ['foo@bar.com', 'test@entrouvert.org'],
'emails_to_members': False,
},
{
'uuid': '6789',
'name': 'Autre service',
'slug': 'autre service',
'description': "Role d'un autre service petite enfance %s" % tenant.domain_url,
},
],
},
}
with CaptureQueriesContext(connection) as ctx:
Command.process_notification(tenant, notification)
assert len(ctx.captured_queries) == 8
assert Group.objects.count() == 0
assert Role.objects.count() == 0
def test_provision_users(tenants):
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from tenant_schemas.utils import tenant_context
from hobo.agent.common.management.commands.hobo_notify import Command
from hobo.agent.common.models import Role
User = get_user_model()
# provision a role
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
role = Role.objects.get()
assert role.uuid == '12345'
assert role.name == 'Service petite enfance'
assert role.description == 'Role du service petite enfance %s' % tenant.domain_url
# test user provisionning
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'issuer': 'http://idp.example.net/idp/saml/metadata',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'user',
'data': [
{
'uuid': 'a' * 32,
'first_name': 'John',
'last_name': 'Doe',
'email': 'john.doe@example.net',
'is_superuser': False,
'roles': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'description': 'etc.',
},
{
'uuid': 'xyz',
'name': 'Service état civil',
'description': 'etc.',
},
],
}
],
},
}
Command.process_notification(tenant, notification)
assert User.objects.count() == 1
assert Role.objects.count() == 1
assert Group.objects.count() == 1
user = User.objects.get()
assert user.username == 'a' * 32
assert user.first_name == 'John'
assert user.last_name == 'Doe'
assert user.email == 'john.doe@example.net'
assert user.is_superuser is False
assert user.is_staff is False
assert user.saml_identifiers.count() == 1
usi = user.saml_identifiers.get()
assert usi.issuer.entity_id == 'http://idp.example.net/idp/saml/metadata'
assert usi.name_id == 'a' * 32
assert user.groups.count() == 1
group = user.groups.get()
assert group.name == 'Service petite enfance'
role = Role.objects.get(group_ptr=group.pk)
assert role.uuid == '12345'
# test nothing change if run a second time
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'issuer': 'http://idp.example.net/idp/saml/metadata',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'user',
'data': [
{
'uuid': 'a' * 32,
'first_name': 'John',
'last_name': 'Doe',
'email': 'john.doe@example.net',
'is_superuser': True,
'roles': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'description': 'etc.',
},
{
'uuid': 'xyz',
'name': 'Service état civil',
'description': 'etc.',
},
],
}
],
},
}
Command.process_notification(tenant, notification)
assert User.objects.count() == 1
assert Role.objects.count() == 1
assert Group.objects.count() == 1
user = User.objects.get()
assert user.username == 'a' * 32
assert user.first_name == 'John'
assert user.last_name == 'Doe'
assert user.email == 'john.doe@example.net'
assert user.is_superuser is True
assert user.is_staff is True
assert user.saml_identifiers.count() == 1
usi = user.saml_identifiers.get()
assert usi.issuer.entity_id == 'http://idp.example.net/idp/saml/metadata'
assert usi.name_id == 'a' * 32
assert user.groups.count() == 1
group = user.groups.get()
assert group.name == 'Service petite enfance'
role = Role.objects.get(group_ptr=group.pk)
assert role.uuid == '12345'
# test deprovision works
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'deprovision',
'issuer': 'http://idp.example.net/idp/saml/metadata',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'user',
'data': [
{
'uuid': 'a' * 32,
}
],
},
}
Command.process_notification(tenant, notification)
assert User.objects.count() == 0