hobo/tests_multitenant/test_hobo_notify.py

507 lines
21 KiB
Python

# -*- coding: utf-8 -*-
import logging
import pytest
from django.db import connection
from django.test.utils import CaptureQueriesContext
pytestmark = pytest.mark.django_db
def test_hobo_notify_roles(caplog, tenants):
from django.contrib.auth.models import Group
from tenant_schemas.utils import tenant_context
from hobo.agent.common.management.commands.hobo_notify import Command
from hobo.agent.common.models import Role
# test wrong audience
for tenant in tenants:
with tenant_context(tenant):
notification = {
u'@type': u'provision',
u'audience': [u'http://coin.com/saml/metadata'],
u'objects': {
u'@type': 'role',
u'data': [
{
u'uuid': u'12345',
u'name': u'Service petite enfance',
u'slug': u'service-petite-enfance',
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 0
assert Role.objects.count() == 0
# test provision
for tenant in tenants:
with tenant_context(tenant):
notification = {
u'@type': u'provision',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'role',
u'data': [
{
u'uuid': u'12345',
u'name': u'Service petite enfance',
u'slug': u'service-petite-enfance',
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
u'details': u'Some details',
u'emails': [u'foo@bar.com', u'test@entrouvert.org'],
u'emails_to_members': False,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
role = Role.objects.get()
assert role.uuid == u'12345'
assert role.name == u'Service petite enfance'
assert role.description == u'Role du service petite enfance %s' % tenant.domain_url
assert role.details == u'Some details'
assert role.emails == [u'foo@bar.com', u'test@entrouvert.org']
assert role.emails_to_members is False
# test full provisionning
for tenant in tenants:
with tenant_context(tenant):
notification = {
u'@type': u'provision',
u'full': True,
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'role',
u'data': [
{
u'uuid': u'xyz',
u'name': u'Service état civil',
u'slug': u'service-etat-civil',
u'description': u'Role du service état civil %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
role = Role.objects.get()
assert role.uuid == u'xyz'
assert role.name == u'Service état civil'
assert role.description == u'Role du service état civil %s' % tenant.domain_url
# test deprovision
for tenant in tenants:
with tenant_context(tenant):
notification = {
u'@type': u'deprovision',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'role',
u'data': [
{
u'uuid': u'xyz',
u'name': u'Service état civil',
u'slug': u'service-etat-civil',
u'description': u'Role du service état civil %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 0
assert Role.objects.count() == 0
# test collisions
for tenant in tenants:
with tenant_context(tenant):
notification = {
u'@type': u'provision',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'role',
u'data': [
{
u'uuid': u'12345',
u'name': u'Service petite enfance',
u'slug': u'service-petite-enfance',
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
# test update by uuid
notification = {
u'@type': u'provision',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'role',
u'data': [
{
u'uuid': u'12345',
u'name': u'Service petite enfance2',
u'slug': u'service-petite-enfance2',
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
assert Role.objects.get().name == notification['objects']['data'][0]['name']
# test uuid change
notification = {
u'@type': u'provision',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'role',
u'data': [
{
u'uuid': u'xyz',
u'name': u'Service petite enfance2',
u'slug': u'service-petite-enfance2',
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
assert Role.objects.get().uuid == notification['objects']['data'][0]['uuid']
# test error on collision
Role.objects.create(uuid='12345', name='Foo', description='foo')
assert Role.objects.count() == 2
notification = {
u'@type': u'provision',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'role',
u'data': [
{
u'uuid': u'12345',
u'name': u'Service petite enfance2',
u'slug': u'service-petite-enfance2',
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 2
assert Role.objects.count() == 2
assert Role.objects.filter(uuid='12345', name=u'Service petite enfance').count() == 0
assert caplog.records[-1].levelno == logging.ERROR
assert caplog.records[-1].msg.startswith('cannot provision')
assert caplog.records[-1].args == (u'Service petite enfance2', u'12345')
def test_hobo_notify_roles_db_queries(caplog, tenants):
from django.contrib.auth.models import Group
from tenant_schemas.utils import tenant_context
from hobo.agent.common.management.commands.hobo_notify import Command
from hobo.agent.common.models import Role
# test provision
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
'details': 'Some details',
'emails': ['foo@bar.com', 'test@entrouvert.org'],
'emails_to_members': False,
},
{
'uuid': '6789',
'name': 'Autre service',
'slug': 'autre service',
'description': "Role d'un autre service petite enfance %s" % tenant.domain_url,
},
],
},
}
with CaptureQueriesContext(connection) as ctx:
Command.process_notification(tenant, notification)
assert len(ctx.captured_queries) == 33
assert Group.objects.count() == 2
assert Role.objects.count() == 2
# test provision full
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'full': True,
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12sed45',
'name': 'Le dernier service',
'slug': 'le-dernier-service',
'description': '',
}
],
},
}
with CaptureQueriesContext(connection) as ctx:
Command.process_notification(tenant, notification)
assert len(ctx.captured_queries) == 39
assert Group.objects.count() == 1
assert Role.objects.count() == 1
# provision again
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'provision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
'details': 'Some details',
'emails': ['foo@bar.com', 'test@entrouvert.org'],
'emails_to_members': False,
},
{
'uuid': '6789',
'name': 'Autre service',
'slug': 'autre service',
'description': "Role d'un autre service petite enfance %s" % tenant.domain_url,
},
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 3
assert Role.objects.count() == 3
# test deprovision
for tenant in tenants:
with tenant_context(tenant):
notification = {
'@type': 'deprovision',
'audience': ['%s/saml/metadata' % tenant.get_base_url()],
'objects': {
'@type': 'role',
'data': [
{
'uuid': '12sed45',
'name': 'Le dernier service',
'slug': 'le-dernier-service',
'description': '',
},
{
'uuid': '12345',
'name': 'Service petite enfance',
'slug': 'service-petite-enfance',
'description': 'Role du service petite enfance %s' % tenant.domain_url,
'details': 'Some details',
'emails': ['foo@bar.com', 'test@entrouvert.org'],
'emails_to_members': False,
},
{
'uuid': '6789',
'name': 'Autre service',
'slug': 'autre service',
'description': "Role d'un autre service petite enfance %s" % tenant.domain_url,
},
],
},
}
with CaptureQueriesContext(connection) as ctx:
Command.process_notification(tenant, notification)
assert len(ctx.captured_queries) == 18
assert Group.objects.count() == 0
assert Role.objects.count() == 0
def test_provision_users(tenants):
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group
from tenant_schemas.utils import tenant_context
from hobo.agent.common.management.commands.hobo_notify import Command
from hobo.agent.common.models import Role
User = get_user_model()
# provision a role
for tenant in tenants:
with tenant_context(tenant):
notification = {
u'@type': u'provision',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'role',
u'data': [
{
u'uuid': u'12345',
u'name': u'Service petite enfance',
u'slug': u'service-petite-enfance',
u'description': u'Role du service petite enfance %s' % tenant.domain_url,
}
],
},
}
Command.process_notification(tenant, notification)
assert Group.objects.count() == 1
assert Role.objects.count() == 1
role = Role.objects.get()
assert role.uuid == u'12345'
assert role.name == u'Service petite enfance'
assert role.description == u'Role du service petite enfance %s' % tenant.domain_url
# test user provisionning
for tenant in tenants:
with tenant_context(tenant):
notification = {
u'@type': u'provision',
u'issuer': 'http://idp.example.net/idp/saml/metadata',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'user',
u'data': [
{
u'uuid': u'a' * 32,
u'first_name': u'John',
u'last_name': u'Doe',
u'email': u'john.doe@example.net',
u'is_superuser': False,
u'roles': [
{
u'uuid': u'12345',
u'name': u'Service petite enfance',
u'description': u'etc.',
},
{
u'uuid': u'xyz',
u'name': u'Service état civil',
u'description': u'etc.',
},
],
}
],
},
}
Command.process_notification(tenant, notification)
assert User.objects.count() == 1
assert Role.objects.count() == 1
assert Group.objects.count() == 1
user = User.objects.get()
assert user.username == 'a' * 32
assert user.first_name == 'John'
assert user.last_name == 'Doe'
assert user.email == 'john.doe@example.net'
assert user.is_superuser is False
assert user.is_staff is False
assert user.saml_identifiers.count() == 1
usi = user.saml_identifiers.get()
assert usi.issuer.entity_id == 'http://idp.example.net/idp/saml/metadata'
assert usi.name_id == 'a' * 32
assert user.groups.count() == 1
group = user.groups.get()
assert group.name == 'Service petite enfance'
role = Role.objects.get(group_ptr=group.pk)
assert role.uuid == '12345'
# test nothing change if run a second time
for tenant in tenants:
with tenant_context(tenant):
notification = {
u'@type': u'provision',
u'issuer': 'http://idp.example.net/idp/saml/metadata',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'user',
u'data': [
{
u'uuid': u'a' * 32,
u'first_name': u'John',
u'last_name': u'Doe',
u'email': u'john.doe@example.net',
u'is_superuser': True,
u'roles': [
{
u'uuid': u'12345',
u'name': u'Service petite enfance',
u'description': u'etc.',
},
{
u'uuid': u'xyz',
u'name': u'Service état civil',
u'description': u'etc.',
},
],
}
],
},
}
Command.process_notification(tenant, notification)
assert User.objects.count() == 1
assert Role.objects.count() == 1
assert Group.objects.count() == 1
user = User.objects.get()
assert user.username == 'a' * 32
assert user.first_name == 'John'
assert user.last_name == 'Doe'
assert user.email == 'john.doe@example.net'
assert user.is_superuser is True
assert user.is_staff is True
assert user.saml_identifiers.count() == 1
usi = user.saml_identifiers.get()
assert usi.issuer.entity_id == 'http://idp.example.net/idp/saml/metadata'
assert usi.name_id == 'a' * 32
assert user.groups.count() == 1
group = user.groups.get()
assert group.name == 'Service petite enfance'
role = Role.objects.get(group_ptr=group.pk)
assert role.uuid == '12345'
# test deprovision works
for tenant in tenants:
with tenant_context(tenant):
notification = {
u'@type': u'deprovision',
u'issuer': 'http://idp.example.net/idp/saml/metadata',
u'audience': [u'%s/saml/metadata' % tenant.get_base_url()],
u'objects': {
u'@type': 'user',
u'data': [
{
u'uuid': u'a' * 32,
}
],
},
}
Command.process_notification(tenant, notification)
assert User.objects.count() == 0