hobo/tests_authentic/test_provisionning.py

489 lines
22 KiB
Python

# -*- coding: utf-8 -*-
import json
import pytest
import lasso
from mock import patch, call, ANY
from django.contrib.auth import get_user_model
from django.core.management import call_command
from tenant_schemas.utils import tenant_context
from authentic2.saml.models import LibertyProvider
from authentic2.a2_rbac.models import Role, RoleAttribute
from authentic2.a2_rbac.utils import get_default_ou
from authentic2.models import Attribute, AttributeValue
from hobo.agent.authentic2.provisionning import provisionning
pytestmark = pytest.mark.django_db
def test_provision_role(transactional_db, tenant, caplog):
with patch('hobo.agent.authentic2.provisionning.notify_agents') as notify_agents:
with tenant_context(tenant):
LibertyProvider.objects.create(ou=get_default_ou(), name='provider',
entity_id='http://provider.com',
protocol_conformance=lasso.PROTOCOL_SAML_2_0)
with provisionning:
role = Role.objects.create(name='coin', ou=get_default_ou())
assert notify_agents.call_count == 1
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'audience', '@type', 'objects', 'full'])
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'provision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'role'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 1
o = data[0]
assert set(o.keys()) == set(['details', 'emails_to_members',
'description', 'uuid', 'name',
'slug', 'emails'])
assert o['details'] == ''
assert o['emails_to_members'] is True
assert o['emails'] == []
notify_agents.reset_mock()
emails = ['john.doe@example.com', 'toto@entrouvert.com']
with provisionning:
RoleAttribute.objects.create(
role=role, name='emails', kind='json',
value=json.dumps(emails))
RoleAttribute.objects.create(
role=role, name='emails_to_members', kind='json',
value=json.dumps(True))
assert notify_agents.call_count == 1
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'audience', '@type', 'objects', 'full'])
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'provision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'role'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 1
o = data[0]
assert set(o.keys()) == set(['details', 'emails_to_members',
'description', 'uuid', 'name',
'slug', 'emails'])
assert o['details'] == ''
assert o['emails_to_members'] is True
assert o['emails'] == emails
notify_agents.reset_mock()
with provisionning:
role.delete()
assert notify_agents.call_count == 1
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'audience', '@type', 'objects', 'full'])
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'deprovision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'role'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 1
o = data[0]
assert set(o.keys()) == set(['uuid'])
def test_provision_user(transactional_db, tenant, caplog):
with patch('hobo.agent.authentic2.provisionning.notify_agents') as notify_agents:
with tenant_context(tenant):
service = LibertyProvider.objects.create(ou=get_default_ou(), name='provider',
entity_id='http://provider.com',
protocol_conformance=lasso.PROTOCOL_SAML_2_0)
role = Role.objects.create(name='coin', service=service, ou=get_default_ou())
role.attributes.create(kind='string', name='is_superuser', value='true')
role2 = Role.objects.create(name='zob', service=service, ou=get_default_ou())
role2.attributes.create(kind='json', name='emails', value='["zob@example.net"]')
child_role = Role.objects.create(name='child', ou=get_default_ou())
notify_agents.reset_mock()
User = get_user_model()
attribute = Attribute.objects.create(label='Code postal', name='code_postal',
kind='string')
with provisionning:
user1 = User.objects.create(username=u'Étienne',
email='etienne.dugenou@example.net',
first_name=u'Étienne',
last_name=u'Dugenou',
ou=get_default_ou())
user2 = User.objects.create(username=u'john.doe',
email='iohn.doe@example.net',
first_name=u'John',
last_name=u'Doe',
ou=get_default_ou())
role2.members.add(user2)
users = {user.uuid: user for user in [user1, user2]}
assert notify_agents.call_count == 2
assert notify_agents.call_args_list[0][0][0]['objects']['@type'] == 'role'
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'issuer', 'audience', '@type', 'objects', 'full'])
assert arg['issuer'] == \
'http://%s/idp/saml2/metadata' % tenant.domain_url
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'provision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'user'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 2
assert len(set([o['uuid'] for o in data])) == 2
for o in data:
assert set(o.keys()) >= set(['uuid', 'username', 'first_name', 'is_superuser',
'last_name', 'email', 'roles'])
assert o['uuid'] in users
user = users[o['uuid']]
assert o['username'] == user.username
assert o['first_name'] == user.first_name
assert o['last_name'] == user.last_name
assert o['email'] == user.email
assert o['roles'] == [{'name': r.name, 'slug': r.slug, 'uuid': r.uuid} for r in
user.roles.all()]
assert o['is_superuser'] is False
notify_agents.reset_mock()
attribute.set_value(user1, '13400')
user1.is_superuser = True
with provisionning:
user1.save()
user2.save()
assert notify_agents.call_count == 1
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'issuer', 'audience', '@type', 'objects', 'full'])
assert arg['issuer'] == \
'http://%s/idp/saml2/metadata' % tenant.domain_url
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'provision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'user'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 2
for o, user in zip(data, [user1, user2]):
assert set(o.keys()) >= set(['code_postal', 'uuid', 'username', 'first_name',
'is_superuser', 'last_name', 'email', 'roles'])
assert o['uuid'] == user.uuid
assert o['username'] == user.username
assert o['first_name'] == user.first_name
assert o['last_name'] == user.last_name
assert o['email'] == user.email
assert o['roles'] == [{'name': r.name, 'slug': r.slug, 'uuid': r.uuid} for r in
user.roles.all()]
assert o['code_postal'] is None or o['code_postal'] == '13400'
assert o['is_superuser'] is user.is_superuser
notify_agents.reset_mock()
with provisionning:
AttributeValue.objects.get(attribute=attribute).delete()
assert notify_agents.call_count == 1
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'issuer', 'audience', '@type', 'objects', 'full'])
assert arg['issuer'] == \
'http://%s/idp/saml2/metadata' % tenant.domain_url
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'provision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'user'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 1
for o in data:
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
'is_superuser', 'last_name', 'email', 'roles'])
assert o['uuid'] == user1.uuid
assert o['username'] == user1.username
assert o['first_name'] == user1.first_name
assert o['last_name'] == user1.last_name
assert o['email'] == user1.email
assert o['roles'] == []
assert o['is_superuser'] is True
user1.is_superuser = False
user1.save()
notify_agents.reset_mock()
with provisionning:
role.members.add(user1)
user2.save()
assert notify_agents.call_count == 3
assert notify_agents.call_args_list[0][0][0]['objects']['@type'] == 'role'
for arg in notify_agents.call_args_list[1:]:
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'issuer', 'audience', '@type', 'objects', 'full'])
assert arg['issuer'] == \
'http://%s/idp/saml2/metadata' % tenant.domain_url
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'provision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'user'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 1
print data
for o in data:
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
'is_superuser', 'last_name', 'email', 'roles'])
assert o['uuid'] in users
user = users[o['uuid']]
assert o['uuid'] == user.uuid
assert o['username'] == user.username
assert o['first_name'] == user.first_name
assert o['last_name'] == user.last_name
assert o['email'] == user.email
assert o['roles'] == [{'name': r.name, 'slug': r.slug, 'uuid': r.uuid} for r in
user.roles.all()]
assert o['is_superuser'] is (user == user1)
assert len(set(arg[0][0]['objects']['data'][0]['uuid'] for arg in
notify_agents.call_args_list[1:])) == 2
notify_agents.reset_mock()
with provisionning:
user1.roles.remove(role)
assert notify_agents.call_count == 2
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'issuer', 'audience', '@type', 'objects', 'full'])
assert arg['issuer'] == \
'http://%s/idp/saml2/metadata' % tenant.domain_url
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'provision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'user'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 1
for o in data:
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
'is_superuser', 'last_name', 'email', 'roles'])
assert o['uuid'] == user1.uuid
assert o['username'] == user1.username
assert o['first_name'] == user1.first_name
assert o['last_name'] == user1.last_name
assert o['email'] == user1.email
assert o['roles'] == []
assert o['is_superuser'] is False
notify_agents.reset_mock()
with provisionning:
user1.roles.add(child_role)
child_role.add_parent(role)
assert notify_agents.call_count == 2
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'issuer', 'audience', '@type', 'objects', 'full'])
assert arg['issuer'] == \
'http://%s/idp/saml2/metadata' % tenant.domain_url
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'provision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'user'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 1
for o in data:
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
'is_superuser', 'last_name', 'email', 'roles'])
assert o['uuid'] == user1.uuid
assert o['username'] == user1.username
assert o['first_name'] == user1.first_name
assert o['last_name'] == user1.last_name
assert o['email'] == user1.email
assert len(o['roles']) == 2
for r in o['roles']:
r1 = {
'uuid': role.uuid,
'name': role.name,
'slug': role.slug
}
r2 = {
'uuid': child_role.uuid,
'name': child_role.name,
'slug': child_role.slug
}
assert r == r1 or r == r2
assert len(set(r['uuid'] for r in o['roles'])) == 2
assert o['is_superuser'] is True
notify_agents.reset_mock()
with provisionning:
child_role.remove_parent(role)
assert notify_agents.call_count == 1
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'issuer', 'audience', '@type', 'objects', 'full'])
assert arg['issuer'] == \
'http://%s/idp/saml2/metadata' % tenant.domain_url
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'provision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'user'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 1
for o in data:
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
'is_superuser', 'last_name', 'email', 'roles'])
assert o['uuid'] == user1.uuid
assert o['username'] == user1.username
assert o['first_name'] == user1.first_name
assert o['last_name'] == user1.last_name
assert o['email'] == user1.email
assert o['roles'] == [{
'uuid': child_role.uuid,
'name': child_role.name,
'slug': child_role.slug
}]
assert o['is_superuser'] is False
notify_agents.reset_mock()
with provisionning:
user1.delete()
user2.delete()
assert notify_agents.call_count == 1
arg = notify_agents.call_args
assert arg == call(ANY)
arg = arg[0][0]
assert isinstance(arg, dict)
assert set(arg.keys()) == set([
'issuer', 'audience', '@type', 'objects', 'full'])
assert arg['issuer'] == \
'http://%s/idp/saml2/metadata' % tenant.domain_url
assert arg['audience'] == ['http://provider.com']
assert arg['@type'] == 'deprovision'
assert arg['full'] is False
objects = arg['objects']
assert isinstance(objects, dict)
assert set(objects.keys()) == set(['data', '@type'])
assert objects['@type'] == 'user'
data = objects['data']
assert isinstance(data, list)
assert len(data) == 2
assert len(set([o['uuid'] for o in data])) == 2
for o in data:
assert set(o.keys()) == set(['uuid'])
assert o['uuid'] in users
def test_provision_createsuperuser(transactional_db, tenant, caplog):
with tenant_context(tenant):
# create a provider so notification messages have an audience.
LibertyProvider.objects.create(ou=None, name='provider',
entity_id='http://provider.com',
protocol_conformance=lasso.PROTOCOL_SAML_2_0)
with patch('hobo.agent.authentic2.provisionning.notify_agents') as notify_agents:
call_command('createsuperuser', domain=tenant.domain_url, uuid='coin',
username='coin', email='coin@coin.org', interactive=False)
assert notify_agents.call_count == 1
@patch('hobo.agent.authentic2.provisionning.notify_agents')
def test_command_hobo_provision(notify_agents, transactional_db, tenant, caplog):
User = get_user_model()
with tenant_context(tenant):
ou = get_default_ou()
LibertyProvider.objects.create(ou=ou, name='provider',
entity_id='http://provider.com',
protocol_conformance=lasso.PROTOCOL_SAML_2_0)
for i in range(10):
Role.objects.create(name='role-%s' % i, ou=ou)
for i in range(10):
User.objects.create(username='user-%s' % i, first_name='John',
last_name='Doe %s' % i, ou=ou,
email='jone.doe-%s@example.com')
with tenant_context(tenant):
# call_command('tenant_command', 'hobo_provision', ...) doesn't work
# https://github.com/bernardopires/django-tenant-schemas/issues/495
# so we call the command from the tenant context.
call_command('hobo_provision', roles=True, users=True)
msg_1 = notify_agents.call_args_list[0][0][0]
msg_2 = notify_agents.call_args_list[1][0][0]
assert msg_1['@type'] == 'provision'
assert msg_1['full'] is True
assert msg_1['objects']['@type'] == 'role'
assert len(msg_1['objects']['data']) == 10
assert msg_2['@type'] == 'provision'
assert msg_2['full'] is False
assert msg_2['objects']['@type'] == 'user'
assert len(msg_2['objects']['data']) == 10