489 lines
22 KiB
Python
489 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
import json
|
|
|
|
import pytest
|
|
import lasso
|
|
|
|
from mock import patch, call, ANY
|
|
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.management import call_command
|
|
|
|
from tenant_schemas.utils import tenant_context
|
|
|
|
from authentic2.saml.models import LibertyProvider
|
|
from authentic2.a2_rbac.models import Role, RoleAttribute
|
|
from authentic2.a2_rbac.utils import get_default_ou
|
|
from authentic2.models import Attribute, AttributeValue
|
|
from hobo.agent.authentic2.provisionning import provisionning
|
|
|
|
pytestmark = pytest.mark.django_db
|
|
|
|
|
|
def test_provision_role(transactional_db, tenant, caplog):
|
|
with patch('hobo.agent.authentic2.provisionning.notify_agents') as notify_agents:
|
|
with tenant_context(tenant):
|
|
LibertyProvider.objects.create(ou=get_default_ou(), name='provider',
|
|
entity_id='http://provider.com',
|
|
protocol_conformance=lasso.PROTOCOL_SAML_2_0)
|
|
with provisionning:
|
|
role = Role.objects.create(name='coin', ou=get_default_ou())
|
|
|
|
assert notify_agents.call_count == 1
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'audience', '@type', 'objects', 'full'])
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'provision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'role'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
o = data[0]
|
|
assert set(o.keys()) == set(['details', 'emails_to_members',
|
|
'description', 'uuid', 'name',
|
|
'slug', 'emails'])
|
|
assert o['details'] == ''
|
|
assert o['emails_to_members'] is True
|
|
assert o['emails'] == []
|
|
|
|
notify_agents.reset_mock()
|
|
emails = ['john.doe@example.com', 'toto@entrouvert.com']
|
|
with provisionning:
|
|
RoleAttribute.objects.create(
|
|
role=role, name='emails', kind='json',
|
|
value=json.dumps(emails))
|
|
RoleAttribute.objects.create(
|
|
role=role, name='emails_to_members', kind='json',
|
|
value=json.dumps(True))
|
|
|
|
assert notify_agents.call_count == 1
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'audience', '@type', 'objects', 'full'])
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'provision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'role'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
o = data[0]
|
|
assert set(o.keys()) == set(['details', 'emails_to_members',
|
|
'description', 'uuid', 'name',
|
|
'slug', 'emails'])
|
|
assert o['details'] == ''
|
|
assert o['emails_to_members'] is True
|
|
assert o['emails'] == emails
|
|
|
|
notify_agents.reset_mock()
|
|
with provisionning:
|
|
role.delete()
|
|
|
|
assert notify_agents.call_count == 1
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'audience', '@type', 'objects', 'full'])
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'deprovision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'role'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
o = data[0]
|
|
assert set(o.keys()) == set(['uuid'])
|
|
|
|
|
|
def test_provision_user(transactional_db, tenant, caplog):
|
|
with patch('hobo.agent.authentic2.provisionning.notify_agents') as notify_agents:
|
|
with tenant_context(tenant):
|
|
service = LibertyProvider.objects.create(ou=get_default_ou(), name='provider',
|
|
entity_id='http://provider.com',
|
|
protocol_conformance=lasso.PROTOCOL_SAML_2_0)
|
|
role = Role.objects.create(name='coin', service=service, ou=get_default_ou())
|
|
role.attributes.create(kind='string', name='is_superuser', value='true')
|
|
role2 = Role.objects.create(name='zob', service=service, ou=get_default_ou())
|
|
role2.attributes.create(kind='json', name='emails', value='["zob@example.net"]')
|
|
child_role = Role.objects.create(name='child', ou=get_default_ou())
|
|
notify_agents.reset_mock()
|
|
User = get_user_model()
|
|
attribute = Attribute.objects.create(label='Code postal', name='code_postal',
|
|
kind='string')
|
|
with provisionning:
|
|
user1 = User.objects.create(username=u'Étienne',
|
|
email='etienne.dugenou@example.net',
|
|
first_name=u'Étienne',
|
|
last_name=u'Dugenou',
|
|
ou=get_default_ou())
|
|
user2 = User.objects.create(username=u'john.doe',
|
|
email='iohn.doe@example.net',
|
|
first_name=u'John',
|
|
last_name=u'Doe',
|
|
ou=get_default_ou())
|
|
role2.members.add(user2)
|
|
users = {user.uuid: user for user in [user1, user2]}
|
|
assert notify_agents.call_count == 2
|
|
assert notify_agents.call_args_list[0][0][0]['objects']['@type'] == 'role'
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'issuer', 'audience', '@type', 'objects', 'full'])
|
|
assert arg['issuer'] == \
|
|
'http://%s/idp/saml2/metadata' % tenant.domain_url
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'provision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'user'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 2
|
|
assert len(set([o['uuid'] for o in data])) == 2
|
|
for o in data:
|
|
assert set(o.keys()) >= set(['uuid', 'username', 'first_name', 'is_superuser',
|
|
'last_name', 'email', 'roles'])
|
|
assert o['uuid'] in users
|
|
user = users[o['uuid']]
|
|
assert o['username'] == user.username
|
|
assert o['first_name'] == user.first_name
|
|
assert o['last_name'] == user.last_name
|
|
assert o['email'] == user.email
|
|
assert o['roles'] == [{'name': r.name, 'slug': r.slug, 'uuid': r.uuid} for r in
|
|
user.roles.all()]
|
|
assert o['is_superuser'] is False
|
|
|
|
notify_agents.reset_mock()
|
|
attribute.set_value(user1, '13400')
|
|
user1.is_superuser = True
|
|
with provisionning:
|
|
user1.save()
|
|
user2.save()
|
|
|
|
assert notify_agents.call_count == 1
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'issuer', 'audience', '@type', 'objects', 'full'])
|
|
assert arg['issuer'] == \
|
|
'http://%s/idp/saml2/metadata' % tenant.domain_url
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'provision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'user'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 2
|
|
for o, user in zip(data, [user1, user2]):
|
|
assert set(o.keys()) >= set(['code_postal', 'uuid', 'username', 'first_name',
|
|
'is_superuser', 'last_name', 'email', 'roles'])
|
|
assert o['uuid'] == user.uuid
|
|
assert o['username'] == user.username
|
|
assert o['first_name'] == user.first_name
|
|
assert o['last_name'] == user.last_name
|
|
assert o['email'] == user.email
|
|
assert o['roles'] == [{'name': r.name, 'slug': r.slug, 'uuid': r.uuid} for r in
|
|
user.roles.all()]
|
|
assert o['code_postal'] is None or o['code_postal'] == '13400'
|
|
assert o['is_superuser'] is user.is_superuser
|
|
|
|
notify_agents.reset_mock()
|
|
with provisionning:
|
|
AttributeValue.objects.get(attribute=attribute).delete()
|
|
|
|
assert notify_agents.call_count == 1
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'issuer', 'audience', '@type', 'objects', 'full'])
|
|
assert arg['issuer'] == \
|
|
'http://%s/idp/saml2/metadata' % tenant.domain_url
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'provision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'user'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
for o in data:
|
|
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
|
|
'is_superuser', 'last_name', 'email', 'roles'])
|
|
assert o['uuid'] == user1.uuid
|
|
assert o['username'] == user1.username
|
|
assert o['first_name'] == user1.first_name
|
|
assert o['last_name'] == user1.last_name
|
|
assert o['email'] == user1.email
|
|
assert o['roles'] == []
|
|
assert o['is_superuser'] is True
|
|
|
|
user1.is_superuser = False
|
|
user1.save()
|
|
|
|
notify_agents.reset_mock()
|
|
with provisionning:
|
|
role.members.add(user1)
|
|
user2.save()
|
|
|
|
assert notify_agents.call_count == 3
|
|
assert notify_agents.call_args_list[0][0][0]['objects']['@type'] == 'role'
|
|
for arg in notify_agents.call_args_list[1:]:
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'issuer', 'audience', '@type', 'objects', 'full'])
|
|
assert arg['issuer'] == \
|
|
'http://%s/idp/saml2/metadata' % tenant.domain_url
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'provision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'user'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
print data
|
|
for o in data:
|
|
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
|
|
'is_superuser', 'last_name', 'email', 'roles'])
|
|
assert o['uuid'] in users
|
|
user = users[o['uuid']]
|
|
assert o['uuid'] == user.uuid
|
|
assert o['username'] == user.username
|
|
assert o['first_name'] == user.first_name
|
|
assert o['last_name'] == user.last_name
|
|
assert o['email'] == user.email
|
|
assert o['roles'] == [{'name': r.name, 'slug': r.slug, 'uuid': r.uuid} for r in
|
|
user.roles.all()]
|
|
assert o['is_superuser'] is (user == user1)
|
|
assert len(set(arg[0][0]['objects']['data'][0]['uuid'] for arg in
|
|
notify_agents.call_args_list[1:])) == 2
|
|
|
|
notify_agents.reset_mock()
|
|
with provisionning:
|
|
user1.roles.remove(role)
|
|
|
|
assert notify_agents.call_count == 2
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'issuer', 'audience', '@type', 'objects', 'full'])
|
|
assert arg['issuer'] == \
|
|
'http://%s/idp/saml2/metadata' % tenant.domain_url
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'provision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'user'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
for o in data:
|
|
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
|
|
'is_superuser', 'last_name', 'email', 'roles'])
|
|
assert o['uuid'] == user1.uuid
|
|
assert o['username'] == user1.username
|
|
assert o['first_name'] == user1.first_name
|
|
assert o['last_name'] == user1.last_name
|
|
assert o['email'] == user1.email
|
|
assert o['roles'] == []
|
|
assert o['is_superuser'] is False
|
|
|
|
notify_agents.reset_mock()
|
|
with provisionning:
|
|
user1.roles.add(child_role)
|
|
child_role.add_parent(role)
|
|
|
|
assert notify_agents.call_count == 2
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'issuer', 'audience', '@type', 'objects', 'full'])
|
|
assert arg['issuer'] == \
|
|
'http://%s/idp/saml2/metadata' % tenant.domain_url
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'provision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'user'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
for o in data:
|
|
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
|
|
'is_superuser', 'last_name', 'email', 'roles'])
|
|
assert o['uuid'] == user1.uuid
|
|
assert o['username'] == user1.username
|
|
assert o['first_name'] == user1.first_name
|
|
assert o['last_name'] == user1.last_name
|
|
assert o['email'] == user1.email
|
|
assert len(o['roles']) == 2
|
|
for r in o['roles']:
|
|
r1 = {
|
|
'uuid': role.uuid,
|
|
'name': role.name,
|
|
'slug': role.slug
|
|
}
|
|
r2 = {
|
|
'uuid': child_role.uuid,
|
|
'name': child_role.name,
|
|
'slug': child_role.slug
|
|
}
|
|
assert r == r1 or r == r2
|
|
assert len(set(r['uuid'] for r in o['roles'])) == 2
|
|
assert o['is_superuser'] is True
|
|
|
|
notify_agents.reset_mock()
|
|
with provisionning:
|
|
child_role.remove_parent(role)
|
|
|
|
assert notify_agents.call_count == 1
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'issuer', 'audience', '@type', 'objects', 'full'])
|
|
assert arg['issuer'] == \
|
|
'http://%s/idp/saml2/metadata' % tenant.domain_url
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'provision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'user'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 1
|
|
for o in data:
|
|
assert set(o.keys()) >= set(['uuid', 'username', 'first_name',
|
|
'is_superuser', 'last_name', 'email', 'roles'])
|
|
assert o['uuid'] == user1.uuid
|
|
assert o['username'] == user1.username
|
|
assert o['first_name'] == user1.first_name
|
|
assert o['last_name'] == user1.last_name
|
|
assert o['email'] == user1.email
|
|
assert o['roles'] == [{
|
|
'uuid': child_role.uuid,
|
|
'name': child_role.name,
|
|
'slug': child_role.slug
|
|
}]
|
|
assert o['is_superuser'] is False
|
|
|
|
notify_agents.reset_mock()
|
|
with provisionning:
|
|
user1.delete()
|
|
user2.delete()
|
|
assert notify_agents.call_count == 1
|
|
arg = notify_agents.call_args
|
|
assert arg == call(ANY)
|
|
arg = arg[0][0]
|
|
assert isinstance(arg, dict)
|
|
assert set(arg.keys()) == set([
|
|
'issuer', 'audience', '@type', 'objects', 'full'])
|
|
assert arg['issuer'] == \
|
|
'http://%s/idp/saml2/metadata' % tenant.domain_url
|
|
assert arg['audience'] == ['http://provider.com']
|
|
assert arg['@type'] == 'deprovision'
|
|
assert arg['full'] is False
|
|
objects = arg['objects']
|
|
assert isinstance(objects, dict)
|
|
assert set(objects.keys()) == set(['data', '@type'])
|
|
assert objects['@type'] == 'user'
|
|
data = objects['data']
|
|
assert isinstance(data, list)
|
|
assert len(data) == 2
|
|
assert len(set([o['uuid'] for o in data])) == 2
|
|
for o in data:
|
|
assert set(o.keys()) == set(['uuid'])
|
|
assert o['uuid'] in users
|
|
|
|
|
|
def test_provision_createsuperuser(transactional_db, tenant, caplog):
|
|
with tenant_context(tenant):
|
|
# create a provider so notification messages have an audience.
|
|
LibertyProvider.objects.create(ou=None, name='provider',
|
|
entity_id='http://provider.com',
|
|
protocol_conformance=lasso.PROTOCOL_SAML_2_0)
|
|
with patch('hobo.agent.authentic2.provisionning.notify_agents') as notify_agents:
|
|
call_command('createsuperuser', domain=tenant.domain_url, uuid='coin',
|
|
username='coin', email='coin@coin.org', interactive=False)
|
|
assert notify_agents.call_count == 1
|
|
|
|
|
|
@patch('hobo.agent.authentic2.provisionning.notify_agents')
|
|
def test_command_hobo_provision(notify_agents, transactional_db, tenant, caplog):
|
|
User = get_user_model()
|
|
with tenant_context(tenant):
|
|
ou = get_default_ou()
|
|
LibertyProvider.objects.create(ou=ou, name='provider',
|
|
entity_id='http://provider.com',
|
|
protocol_conformance=lasso.PROTOCOL_SAML_2_0)
|
|
for i in range(10):
|
|
Role.objects.create(name='role-%s' % i, ou=ou)
|
|
for i in range(10):
|
|
User.objects.create(username='user-%s' % i, first_name='John',
|
|
last_name='Doe %s' % i, ou=ou,
|
|
email='jone.doe-%s@example.com')
|
|
|
|
with tenant_context(tenant):
|
|
# call_command('tenant_command', 'hobo_provision', ...) doesn't work
|
|
# https://github.com/bernardopires/django-tenant-schemas/issues/495
|
|
# so we call the command from the tenant context.
|
|
call_command('hobo_provision', roles=True, users=True)
|
|
|
|
msg_1 = notify_agents.call_args_list[0][0][0]
|
|
msg_2 = notify_agents.call_args_list[1][0][0]
|
|
assert msg_1['@type'] == 'provision'
|
|
assert msg_1['full'] is True
|
|
assert msg_1['objects']['@type'] == 'role'
|
|
assert len(msg_1['objects']['data']) == 10
|
|
assert msg_2['@type'] == 'provision'
|
|
assert msg_2['full'] is False
|
|
assert msg_2['objects']['@type'] == 'user'
|
|
assert len(msg_2['objects']['data']) == 10
|