wcs/tests/test_hobo_notify.py

852 lines
28 KiB
Python

# -*- coding: utf-8 -*-
import uuid
import pytest
from wcs.ctl.hobo_notify import CmdHoboNotify
from wcs.qommon import force_str
from wcs.qommon import storage as st
from wcs.qommon.http_request import HTTPRequest
from .utilities import create_temporary_pub
def pytest_generate_tests(metafunc):
if 'pub' in metafunc.fixturenames:
metafunc.parametrize('pub', ['pickle', 'sql'], indirect=True)
@pytest.fixture
def pub(request):
pub = create_temporary_pub(sql_mode=(request.param == 'sql'))
req = HTTPRequest(None, {'SCRIPT_NAME': '/', 'SERVER_NAME': 'example.net'})
pub.set_app_dir(req)
pub.cfg['sp'] = {'saml2_providerid': 'test'}
pub.write_cfg()
pub.role_class.wipe()
r = pub.role_class(name='Service étt civil')
r.slug = 'service-ett-civil'
r.store()
return pub
def test_process_notification_role_wrong_audience(pub):
notification = {
'@type': u'provision',
'audience': [u'coin'],
'full': True,
'objects': {
'@type': 'role',
'data': [
{
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': str(uuid.uuid4()),
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
{
'@type': 'role',
'name': u'Service état civil',
'slug': u'service-etat-civil',
'description': u'Rôle du service état civil',
'uuid': str(uuid.uuid4()),
'emails': [u'etat-civil@example.com'],
'emails_to_members': True,
},
],
},
}
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
CmdHoboNotify.process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
def test_process_notification_role(pub):
uuid1 = str(uuid.uuid4())
uuid2 = str(uuid.uuid4())
notification = {
'@type': u'provision',
'audience': [u'test'],
'full': True,
'objects': {
'@type': 'role',
'data': [
{
'name': u'Service enfance',
'slug': u'service-enfance',
'details': u'Rôle du service petite enfance',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
{
'name': u'Service état civil',
'slug': u'service-ett-civil',
'details': u'Rôle du service état civil',
'uuid': uuid2,
'emails': [u'etat-civil@example.com'],
'emails_to_members': True,
},
],
},
}
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
assert old_role.uuid == uuid2
assert old_role.slug == 'service-ett-civil'
assert old_role.details == "Rôle du service état civil"
assert old_role.emails == ['etat-civil@example.com']
assert old_role.emails_to_members is True
new_role = pub.role_class.get_on_index(uuid1, 'uuid')
assert new_role.id == uuid1
assert new_role.name == 'Service enfance'
assert new_role.slug == 'service-enfance'
assert new_role.uuid == uuid1
assert new_role.details == "Rôle du service petite enfance"
assert new_role.emails == ['petite-enfance@example.com']
assert new_role.emails_to_members is False
notification = {
'@type': u'provision',
'audience': [u'test'],
'full': True,
'objects': {
'@type': 'role',
'data': [
{
'@type': 'role',
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': True,
},
],
},
}
CmdHoboNotify.process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].id == new_role.id
assert pub.role_class.select()[0].uuid == uuid1
assert pub.role_class.select()[0].name == 'Service enfance'
assert pub.role_class.select()[0].slug == 'service-enfance'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails == ['petite-enfance@example.com']
assert pub.role_class.select()[0].emails_to_members is True
def test_process_notification_internal_role(pub):
pub.role_class.wipe()
notification = {
'@type': u'provision',
'audience': [u'test'],
'full': True,
'objects': {
'@type': 'role',
'data': [
{
'name': u'Service enfance',
'slug': u'_service-enfance',
'details': u'Rôle du service petite enfance',
'uuid': str(uuid.uuid4()),
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
],
},
}
CmdHoboNotify.process_notification(notification)
assert pub.role_class.count() == 1
role = pub.role_class.select()[0]
assert role.is_internal()
def test_process_notification_role_description(pub):
# check descriptions are not used to fill role.details
uuid1 = str(uuid.uuid4())
uuid2 = str(uuid.uuid4())
notification = {
'@type': u'provision',
'audience': [u'test'],
'full': True,
'objects': {
'@type': 'role',
'data': [
{
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
{
'name': u'Service état civil',
'slug': u'service-ett-civil',
'description': u'Rôle du service état civil',
'uuid': uuid2,
'emails': [u'etat-civil@example.com'],
'emails_to_members': True,
},
],
},
}
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
assert old_role.slug == 'service-ett-civil'
assert old_role.uuid == uuid2
assert old_role.details is None
assert old_role.emails == ['etat-civil@example.com']
assert old_role.emails_to_members is True
new_role = pub.role_class.get_on_index(uuid1, 'uuid')
assert new_role.name == 'Service enfance'
assert new_role.slug == 'service-enfance'
assert new_role.uuid == uuid1
assert new_role.details is None
assert new_role.emails == ['petite-enfance@example.com']
assert new_role.emails_to_members is False
notification = {
'@type': u'provision',
'audience': [u'test'],
'full': True,
'objects': {
'@type': 'role',
'data': [
{
'@type': 'role',
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': True,
},
],
},
}
CmdHoboNotify.process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].id == new_role.id
assert pub.role_class.select()[0].name == 'Service enfance'
assert pub.role_class.select()[0].uuid == uuid1
assert pub.role_class.select()[0].slug == 'service-enfance'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails == ['petite-enfance@example.com']
assert pub.role_class.select()[0].emails_to_members is True
def test_process_notification_role_deprovision(pub):
uuid1 = str(uuid.uuid4())
notification = {
'@type': u'deprovision',
'audience': [u'test'],
'full': False,
'objects': {
'@type': 'role',
'data': [
{
'@type': 'role',
'uuid': uuid1,
},
],
},
}
role = pub.role_class.select()[0]
role.remove_self()
assert role.name == 'Service étt civil'
assert role.slug == 'service-ett-civil'
role.id = uuid1
role.store()
role = pub.role_class('foo')
role.slug = 'bar'
role.store()
assert pub.role_class.count() == 2
CmdHoboNotify.process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].slug == 'bar'
r = pub.role_class(name='Service étt civil')
r.uuid = uuid1
r.store()
assert pub.role_class.count() == 2
CmdHoboNotify.process_notification(notification)
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].slug == 'bar'
PROFILE = {
'fields': [
{
'kind': 'title',
'description': '',
'required': False,
'user_visible': True,
'label': u'Civilité',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'title',
},
{
'kind': 'string',
'description': '',
'required': True,
'user_visible': True,
'label': u'Prénom',
'disabled': False,
'user_editable': True,
'asked_on_registration': True,
'name': 'first_name',
},
{
'kind': 'string',
'description': '',
'required': True,
'user_visible': True,
'label': 'Nom',
'disabled': False,
'user_editable': True,
'asked_on_registration': True,
'name': 'last_name',
},
{
'kind': 'email',
'description': '',
'required': True,
'user_visible': True,
'label': u'Adresse électronique',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'email',
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Addresse',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'address',
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Code postal',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'zipcode',
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Commune',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'city',
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': u'Téléphone',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'phone',
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Mobile',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'mobile',
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Pays',
'disabled': True,
'user_editable': True,
'asked_on_registration': False,
'name': 'country',
},
{
'kind': 'birthdate',
'description': '',
'required': False,
'user_visible': True,
'label': 'Date de naissance',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'birthdate',
},
]
}
def test_process_notification_user_provision(pub):
User = pub.user_class
# create some roles
from wcs.ctl.check_hobos import CmdCheckHobos
# setup an hobo profile
CmdCheckHobos().update_profile(PROFILE, pub)
uuid1 = str(uuid.uuid4())
uuid2 = str(uuid.uuid4())
notification = {
'@type': u'provision',
'audience': [u'test'],
'full': True,
'objects': {
'@type': 'role',
'data': [
{
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': uuid1,
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
{
'name': u'Service état civil',
'slug': u'service-ett-civil',
'description': u'Rôle du service état civil',
'uuid': uuid2,
'emails': [u'etat-civil@example.com'],
'emails_to_members': True,
},
],
},
}
assert pub.role_class.count() == 1
assert pub.role_class.select()[0].name == 'Service étt civil'
assert pub.role_class.select()[0].slug == 'service-ett-civil'
assert pub.role_class.select()[0].details is None
assert pub.role_class.select()[0].emails is None
assert pub.role_class.select()[0].emails_to_members is False
existing_role_id = pub.role_class.select()[0].id
CmdHoboNotify.process_notification(notification)
assert pub.role_class.count() == 2
old_role = pub.role_class.get(existing_role_id)
assert old_role.name == 'Service état civil'
assert old_role.uuid == uuid2
assert old_role.slug == 'service-ett-civil'
assert old_role.details is None
assert old_role.emails == ['etat-civil@example.com']
assert old_role.emails_to_members is True
new_role = pub.role_class.get_on_index(uuid1, 'uuid')
assert new_role.name == 'Service enfance'
assert new_role.slug == 'service-enfance'
assert new_role.uuid == uuid1
assert new_role.details is None
assert new_role.emails == ['petite-enfance@example.com']
assert new_role.emails_to_members is False
notification = {
u'@type': u'provision',
u'issuer': 'http://idp.example.net/idp/saml/metadata',
u'audience': [u'test'],
u'objects': {
u'@type': 'user',
u'data': [
{
u'uuid': u'a' * 32,
u'first_name': u'John',
u'last_name': u'Doé',
u'email': u'john.doe@example.net',
u'zipcode': u'13400',
u'is_superuser': False,
'is_active': True,
u'roles': [
{
u'uuid': uuid1,
u'name': u'Service petite enfance',
u'description': u'etc.',
},
{
u'uuid': uuid2,
u'name': u'Service état civil',
u'description': u'etc.',
},
],
}
],
},
}
CmdHoboNotify.process_notification(notification)
assert User.count() == 1
user = User.select()[0]
assert user.form_data is not None
assert user.form_data['_email'] == 'john.doe@example.net'
assert user.email == 'john.doe@example.net'
assert user.form_data['_first_name'] == 'John'
assert user.form_data['_last_name'] == force_str(u'Doé')
assert user.form_data['_zipcode'] == '13400'
assert user.form_data['_birthdate'] is None
assert user.name_identifiers == ['a' * 32]
assert user.is_admin is False
assert user.is_active is True
assert set(user.roles) == set([new_role.id, old_role.id])
notification = {
u'@type': u'provision',
u'issuer': 'http://idp.example.net/idp/saml/metadata',
u'audience': [u'test'],
u'objects': {
u'@type': 'user',
u'data': [
{
u'uuid': u'a' * 32,
u'first_name': u'John',
u'last_name': u'Doe',
u'email': u'john.doe@example.net',
u'zipcode': u'13600',
u'birthdate': u'2000-01-01',
u'is_superuser': True,
'is_active': False,
u'roles': [
{
u'uuid': uuid2,
u'name': u'Service état civil',
u'description': u'etc.',
},
{
u'uuid': str(uuid.uuid4()),
u'name': u'Service enfance',
u'description': u'',
},
],
}
],
},
}
CmdHoboNotify.process_notification(notification)
assert User.count() == 1
user = User.select()[0]
assert user.form_data is not None
assert user.form_data['_email'] == 'john.doe@example.net'
assert user.email == 'john.doe@example.net'
assert user.form_data['_first_name'] == 'John'
assert user.form_data['_last_name'] == 'Doe'
assert user.form_data['_zipcode'] == '13600'
assert user.form_data['_birthdate'].tm_year == 2000
assert user.name_identifiers == ['a' * 32]
assert user.is_admin is True
assert user.is_active is False
assert set(user.roles) == set([old_role.id])
for birthdate in ('baddate', '', None):
notification = {
u'@type': u'provision',
u'issuer': 'http://idp.example.net/idp/saml/metadata',
u'audience': [u'test'],
u'objects': {
u'@type': 'user',
u'data': [
{
u'uuid': u'a' * 32,
u'first_name': u'John',
u'last_name': u'Doe',
u'email': u'john.doe@example.net',
u'zipcode': u'13600',
u'birthdate': birthdate,
u'is_superuser': True,
u'roles': [
{
u'uuid': uuid2,
u'name': u'Service état civil',
u'description': u'etc.',
},
],
}
],
},
}
CmdHoboNotify.process_notification(notification)
assert User.count() == 1
if birthdate not in (None, ''): # wrong value : no nothing
assert User.select()[0].form_data['_birthdate'].tm_year == 2000
else: # empty value : empty field
assert User.select()[0].form_data['_birthdate'] is None
# check provisionning a count with no email works
for no_email in (None, ''):
User.wipe()
notification = {
'@type': 'provision',
'issuer': 'http://idp.example.net/idp/saml/metadata',
'audience': ['test'],
'objects': {
'@type': 'user',
'data': [
{
'uuid': 'a' * 32,
'first_name': 'John',
'last_name': 'Doé',
'email': no_email,
'zipcode': '13400',
'is_superuser': False,
'roles': [
{
'uuid': uuid1,
'name': 'Service petite enfance',
'description': 'etc.',
},
{
'uuid': uuid2,
'name': 'Service état civil',
'description': 'etc.',
},
],
}
],
},
}
CmdHoboNotify.process_notification(notification)
assert User.count() == 1
assert User.select()[0].first_name == 'John'
assert not User.select()[0].email
def notify_of_exception(exc_info, context):
raise Exception(exc_info)
def test_process_notification_user_with_errors(pub):
User = pub.user_class
# setup an hobo profile
from wcs.ctl.check_hobos import CmdCheckHobos
User.wipe()
pub.role_class.wipe()
CmdCheckHobos().update_profile(PROFILE, pub)
notification = {
u'@type': u'provision',
u'issuer': 'http://idp.example.net/idp/saml/metadata',
u'audience': [u'test'],
'full': True,
u'objects': {
u'@type': 'user',
u'data': [
{
u'uuid': u'a' * 32,
u'first_name': u'John',
u'last_name': u'Doe',
u'email': u'john.doe@example.net',
u'zipcode': u'13400',
u'is_superuser': False,
u'roles': [
{
u'uuid': u'xyz',
u'name': u'Service état civil',
u'description': u'etc.',
},
],
}
],
},
}
with pytest.raises(NotImplementedError) as e:
CmdHoboNotify.process_notification(notification)
assert e.value.args == ('full is not supported for users',)
assert User.count() == 0
notification['full'] = False
pub.notify_of_exception = notify_of_exception
for key in ('uuid', 'first_name', 'last_name', 'email'):
backup = notification['objects']['data'][0][key]
del notification['objects']['data'][0][key]
with pytest.raises(Exception) as e:
CmdHoboNotify.process_notification(notification)
assert e.value.args[0][0] == ValueError
assert e.value.args[0][1].args == ('invalid user',)
assert User.count() == 0
notification['objects']['data'][0][key] = backup
notification['@type'] = 'deprovision'
del notification['objects']['data'][0]['uuid']
with pytest.raises(Exception) as e:
CmdHoboNotify.process_notification(notification)
assert e.value.args[0][0] == KeyError
assert e.value.args[0][1].args == ('user without uuid',)
def test_process_notification_role_with_errors(pub):
User = pub.user_class
User.wipe()
pub.role_class.wipe()
notification = {
'@type': u'provision',
'audience': [u'test'],
'full': True,
'objects': {
'@type': 'role',
'data': [
{
'name': u'Service enfance',
'slug': u'service-enfance',
'details': u'Rôle du service petite enfance',
# 'uuid': u'12345',
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
}
],
},
}
with pytest.raises(KeyError) as e:
CmdHoboNotify.process_notification(notification)
assert e.value.args == ('role without uuid',)
assert pub.role_class.count() == 0
notification['objects']['data'][0]['uuid'] = u'12345'
del notification['objects']['data'][0]['name']
with pytest.raises(ValueError) as e:
CmdHoboNotify.process_notification(notification)
assert e.value.args == ('invalid role',)
assert pub.role_class.count() == 0
def test_process_user_deprovision(pub):
User = pub.user_class
# setup an hobo profile
from wcs.ctl.check_hobos import CmdCheckHobos
User.wipe()
pub.role_class.wipe()
CmdCheckHobos().update_profile(PROFILE, pub)
user = User()
user.name = 'Pierre'
user.name_identifiers = ['a' * 32]
user.store()
notification = {
'@type': 'deprovision',
'issuer': 'http://idp.example.net/idp/saml/metadata',
'audience': ['test'],
'objects': {
'@type': 'user',
'data': [
{
'uuid': u'a' * 32,
}
],
},
}
assert User.count() == 1
assert len(User.select([st.NotNull('deleted_timestamp')])) == 0
CmdHoboNotify.process_notification(notification)
assert User.count() == 1
assert len(User.select([st.NotNull('deleted_timestamp')])) == 1
def test_process_user_deprovision_with_data(pub):
from wcs.formdef import FormDef
User = pub.user_class
# setup an hobo profile
from wcs.ctl.check_hobos import CmdCheckHobos
User.wipe()
FormDef.wipe()
CmdCheckHobos().update_profile(PROFILE, pub)
user = User()
user.name = 'Pierre'
user.name_identifiers = ['a' * 32]
user.store()
formdef = FormDef()
formdef.name = 'foobar'
formdef.url_name = 'foobar'
formdef.fields = []
formdef.store()
data_class = formdef.data_class()
formdata = data_class()
formdata.user_id = user.id
formdata.store()
notification = {
'@type': 'deprovision',
'issuer': 'http://idp.example.net/idp/saml/metadata',
'audience': ['test'],
'objects': {
'@type': 'user',
'data': [
{
'uuid': u'a' * 32,
}
],
},
}
assert User.count() == 1
assert len(User.select([st.NotNull('deleted_timestamp')])) == 0
CmdHoboNotify.process_notification(notification)
assert User.count() == 1
assert len(User.select([st.NotNull('deleted_timestamp')])) == 1