hobo_notify: add user provisionning (#8444)

This commit is contained in:
Benjamin Dauvergne 2015-09-30 19:07:06 +02:00
parent ebe2e5b071
commit 02b6add5cb
2 changed files with 285 additions and 3 deletions

View File

@ -4,6 +4,7 @@ from quixote import cleanup
from wcs.ctl.hobo_notify import CmdHoboNotify
from wcs.roles import Role
from wcs.users import User
from utilities import create_temporary_pub
@ -32,7 +33,7 @@ def teardown_function(function):
Role.wipe()
def test_process_notification_wrong_audience():
def test_process_notification_role_wrong_audience():
notification = {
'@type': u'provision',
'audience': [u'coin'],
@ -75,7 +76,7 @@ def test_process_notification_wrong_audience():
assert Role.select()[0].emails_to_members is False
def test_process_notification():
def test_process_notification_role():
notification = {
'@type': u'provision',
'audience': [u'test'],
@ -151,7 +152,7 @@ def test_process_notification():
assert Role.select()[0].emails == ['petite-enfance@example.com']
assert Role.select()[0].emails_to_members is True
def test_process_notification_deprovision():
def test_process_notification_role_deprovision():
notification = {
'@type': u'deprovision',
'audience': [u'test'],
@ -176,3 +177,223 @@ def test_process_notification_deprovision():
assert Role.select()[0].slug == 'service-ett-civil'
CmdHoboNotify.process_notification(notification)
assert Role.count() == 0
PROFILE = {
'fields': [
{
'kind': 'title',
'description': '',
'required': False,
'user_visible': True,
'label': u'Civilité',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'title'
},
{
'kind': 'string',
'description': '',
'required': True,
'user_visible': True,
'label': u'Prénom',
'disabled': False,
'user_editable': True,
'asked_on_registration': True,
'name': 'first_name'
},
{
'kind': 'string',
'description': '',
'required': True,
'user_visible': True,
'label': 'Nom',
'disabled': False,
'user_editable': True,
'asked_on_registration': True,
'name': 'last_name'
},
{
'kind': 'email',
'description': '',
'required': True,
'user_visible': True,
'label': u'Adresse électronique',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'email'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Addresse',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'address'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Code postal',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'zipcode'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Commune',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'city'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': u'Téléphone',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'phone'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Mobile',
'disabled': False,
'user_editable': True,
'asked_on_registration': False,
'name': 'mobile'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Pays',
'disabled': True,
'user_editable': True,
'asked_on_registration': False,
'name': 'country'
},
{
'kind': 'string',
'description': '',
'required': False,
'user_visible': True,
'label': 'Date de naissance',
'disabled': True,
'user_editable': True,
'asked_on_registration': False,
'name': 'birthdate'
}
]
}
def test_process_notification_user_provision():
# create some roles
from wcs.ctl.check_hobos import CmdCheckHobos
# setup an hobo profile
CmdCheckHobos().update_profile(PROFILE, pub)
notification = {
'@type': u'provision',
'audience': [u'test'],
'full': True,
'objects': {
'@type': 'role',
'data': [
{
'name': u'Service enfance',
'slug': u'service-enfance',
'description': u'Rôle du service petite enfance',
'uuid': u'12345',
'emails': [u'petite-enfance@example.com'],
'emails_to_members': False,
},
{
'name': u'Service état civil',
'slug': u'service-ett-civil',
'description': u'Rôle du service état civil',
'uuid': u'xyz',
'emails': [u'etat-civil@example.com'],
'emails_to_members': True,
},
]
}
}
assert Role.count() == 1
assert Role.select()[0].name == 'Service étt civil'
assert Role.select()[0].slug == 'service-ett-civil'
assert Role.select()[0].details is None
assert Role.select()[0].emails is None
assert Role.select()[0].emails_to_members is False
existing_role_id = Role.select()[0].id
CmdHoboNotify.process_notification(notification)
assert Role.count() == 2
old_role = Role.get(existing_role_id)
assert old_role.name == 'Service état civil'
assert old_role.slug == 'xyz'
assert old_role.details == 'Rôle du service état civil'
assert old_role.emails == ['etat-civil@example.com']
assert old_role.emails_to_members is True
new_role = Role.get('12345')
assert new_role.name == 'Service enfance'
assert new_role.slug == '12345'
assert new_role.details == 'Rôle du service petite enfance'
assert new_role.emails == ['petite-enfance@example.com']
assert new_role.emails_to_members is False
notification = {
u'@type': u'provision',
u'issuer': 'http://idp.example.net/idp/saml/metadata',
u'audience': [u'test'],
u'objects': {
u'@type': 'user',
u'data': [
{
u'uuid': u'a' * 32,
u'first_name': u'John',
u'last_name': u'Doe',
u'email': u'john.doe@example.net',
u'roles': [
{
u'uuid': u'12345',
u'name': u'Service petite enfance',
u'description': u'etc.',
},
{
u'uuid': u'xyz',
u'name': u'Service état civil',
u'description': u'etc.',
},
],
}
]
}
}
CmdHoboNotify.process_notification(notification)
assert User.count() == 1
user = User.select()[0]
assert user.form_data is not None
assert user.form_data['_email'] == 'john.doe@example.net'
assert user.email == 'john.doe@example.net'
assert user.form_data['_first_name'] == 'John'
assert user.form_data['_last_name'] == 'Doe'
assert user.name_identifiers == ['a'*32]
assert set(user.roles) == set(['12345', old_role.id])

View File

@ -22,6 +22,7 @@ from quixote import get_publisher
from wcs.roles import Role
from qommon.ctl import Command
from qommon.publisher import get_cfg
from wcs.qommon.misc import json_encode_helper
class CmdHoboNotify(Command):
@ -143,4 +144,64 @@ class CmdHoboNotify(Command):
if role.slug not in uuids:
role.remove_self()
@classmethod
def check_valid_user(cls, o):
return 'uuid' in o \
and 'email' in o \
and 'first_name' in o \
and 'last_name' in o \
and 'roles' in o
@classmethod
def provision_user(cls, publisher, issuer, action, data, full=False):
from wcs.users import User
from wcs.roles import Role
assert not full # full is not supported for users
for o in data:
try:
o = json_encode_helper(o, publisher.site_charset)
if action == 'provision':
assert cls.check_valid_user(o)
uuid = o['uuid']
users = User.get_users_with_name_identifier(uuid)
assert len(users) < 2 # we are fucked if it's not the case...
if users:
user = users[0]
else:
user = User(uuid)
user.form_data = user.form_data or {}
user.form_data['_email'] = o['email']
user.form_data['_first_name'] = o['first_name']
user.form_data['_last_name'] = o['last_name']
user.name_identifiers = [uuid]
role_uuids = [role['uuid'] for role in o['roles']]
for role_uuid in role_uuids:
try:
role = Role.get(role_uuid)
except KeyError:
try:
role = Role.get_on_index(role_uuid, 'slug')
except KeyError:
continue
if role.id not in user.roles:
user.add_roles([role.id])
user.set_attributes_from_formdata(user.form_data)
user.store()
# verify we did not produce a doublon
users = User.get_users_with_name_identifier(uuid)
for doublon in users:
if int(doublon.id) < int(user.id): # we are not the first so backoff
user.remove_self()
break
elif action == 'deprovision':
assert 'uuid' in o
uuid = o['uuid']
users = User.get_users_with_name_identifier(uuid)
for user in users:
user.remove_self()
except Exception, e:
publisher.notify_of_exception(sys.exc_info(), context='[PROVISIONNING]')
CmdHoboNotify.register()