wcs/wcs/ctl/hobo_notify.py

219 lines
8.6 KiB
Python

# w.c.s. - web application for online forms
# Copyright (C) 2005-2014 Entr'ouvert
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
import sys
import json
from quixote import get_publisher
from wcs.roles import Role
from ..qommon.ctl import Command
from ..qommon.publisher import get_cfg
from wcs.admin.settings import UserFieldsFormDef
from wcs.qommon import force_str
from wcs.qommon.misc import json_encode_helper
class CmdHoboNotify(Command):
name = 'hobo_notify'
def execute(self, base_options, sub_options, args):
self.base_options = base_options
if sub_options.extra:
if not self.config.has_section('extra'):
self.config.add_section('extra')
for i, extra in enumerate(sub_options.extra):
self.config.set('extra', 'cmd_line_extra_%d' % i, extra)
notification = self.load_notification(args)
if not self.check_valid_notification(notification):
sys.exit(1)
from .. import publisher
publisher.WcsPublisher.configure(self.config)
pub = publisher.WcsPublisher.create_publisher(
register_tld_names=False)
global_app_dir = pub.app_dir
for hostname in publisher.WcsPublisher.get_tenants():
app_dir = os.path.join(global_app_dir, hostname)
if not os.path.exists(os.path.join(app_dir, 'config.pck')):
continue
pub.app_dir = app_dir
pub.set_config()
self.process_notification(notification, pub)
@classmethod
def load_notification(cls, args):
if args[0] == '-':
# get environment definition from stdin
return json.load(sys.stdin)
else:
return json.load(file(args[0]))
@classmethod
def check_valid_notification(cls, notification):
return isinstance(notification, dict) \
and notification['@type'] in ['provision', 'deprovision'] \
and 'objects' in notification \
and 'audience' in notification \
and isinstance(notification['audience'], list) \
and isinstance(notification['objects'], dict) \
and '@type' in notification['objects'] \
and 'data' in notification['objects'] \
and isinstance(notification['objects']['data'], list)
@classmethod
def process_notification(cls, notification, publisher=None):
publisher = publisher or get_publisher()
action = notification['@type']
audience = notification['audience']
full = notification['full'] if 'full' in notification else False
issuer = notification.get('issuer')
# Verify tenant is in audience
entity_id = get_cfg('sp', {}).get('saml2_providerid')
if not entity_id or entity_id not in audience:
return
t = notification['objects']['@type']
# Now provision/deprovision
getattr(cls, 'provision_' + t)(publisher, issuer, action,
notification['objects']['data'],
full=full)
@classmethod
def check_valid_role(cls, o):
return 'uuid' in o \
and 'name' in o \
and 'emails' in o \
and 'emails_to_members' in o \
and 'slug' in o
@classmethod
def provision_role(cls, publisher, issuer, action, data, full=False):
uuids = set()
for o in data:
if 'uuid' not in o:
raise KeyError('role without uuid')
uuid = force_str(o['uuid'])
uuids.add(uuid)
slug = None
name = None
if action == 'provision':
if not cls.check_valid_role(o):
raise ValueError('invalid role')
slug = force_str(o['slug'])
details = force_str(o.get('details', '')) or None
name = force_str(o['name'])
emails = [force_str(email) for email in o['emails']]
emails_to_members = o['emails_to_members']
# Find existing role
role = Role.resolve(uuid, slug, name)
if not role:
if action != 'provision':
continue
role = Role(id=uuid)
if action == 'provision':
# Provision/rename
role.name = name
role.uuid = uuid
role.slug = slug
role.emails = emails
role.details = details
role.emails_to_members = emails_to_members
if role.slug.startswith('_'):
role.internal = True
role.allows_backoffice_access = False
role.store()
elif action == 'deprovision':
# Deprovision
role.remove_self()
# All roles have been sent
if full and action == 'provision':
for role in Role.select():
if role.uuid not in uuids:
role.remove_self()
@classmethod
def check_valid_user(cls, o):
return 'uuid' in o \
and 'email' in o \
and 'first_name' in o \
and 'last_name' in o \
and 'roles' in o
@classmethod
def provision_user(cls, publisher, issuer, action, data, full=False):
formdef = UserFieldsFormDef(publisher=publisher)
User = publisher.user_class
if full:
raise NotImplementedError('full is not supported for users')
for o in data:
try:
o = json_encode_helper(o, publisher.site_charset)
if action == 'provision':
if not cls.check_valid_user(o):
raise ValueError('invalid user')
uuid = o['uuid']
users = User.get_users_with_name_identifier(uuid)
if len(users) > 1:
raise Exception('duplicate users')
if users:
user = users[0]
else:
user = User(uuid)
user.form_data = user.form_data or {}
for field in formdef.fields:
if not field.id.startswith('_'):
continue
field_value = o.get(field.id[1:])
if field.convert_value_from_anything:
try:
field_value = field.convert_value_from_anything(field_value)
except ValueError:
publisher.notify_of_exception(sys.exc_info(), context='[PROVISIONNING]')
continue
user.form_data[field.id] = field_value
user.name_identifiers = [uuid]
# reset roles
user.is_admin = o.get('is_superuser', False)
user.roles = []
for role_ref in o.get('roles', []):
role = Role.resolve(role_ref['uuid'])
if role and role.id not in user.roles:
user.add_roles([role.id])
user.set_attributes_from_formdata(user.form_data)
user.store()
# verify we did not produce a doublon
users = User.get_users_with_name_identifier(uuid)
for doublon in users:
if int(doublon.id) < int(user.id): # we are not the first so backoff
user.remove_self()
break
elif action == 'deprovision':
if 'uuid' not in o:
raise KeyError('user without uuid')
uuid = o['uuid']
users = User.get_users_with_name_identifier(uuid)
for user in users:
user.remove_self()
except Exception as e:
publisher.notify_of_exception(sys.exc_info(), context='[PROVISIONNING]')
CmdHoboNotify.register()