This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
portail-citoyen2/portail_citoyen/management/commands/load-wcs-roles.py

82 lines
3.3 KiB
Python

import random
import string
import locale
from optparse import make_option
import os.path
import logging
import urlparse
import re
try:
from django.contrib.auth import get_user_model
except ImportError:
from django.contrib.auth.models import User
get_user_model = lambda: User
from django.core.management.base import BaseCommand, CommandError, NoArgsCommand
from django.db import transaction
from django.utils.http import urlencode
from django.contrib.auth.models import Group
from authentic2.saml.models import LibertyProvider, LibertyFederation
from data_source_plugin.signature import sign_url
import requests
logger = logging.getLogger()
class Command(BaseCommand):
args = '<wcs_data_path wcs_data_path ...>'
help = '''Load W.C.S. roles'''
option_list = BaseCommand.option_list + (
make_option('--url', action="store"),
make_option('--prefix', action="store", default='AuQuo::'),
make_option('--orig', action="store"),
make_option('--key', action="store"),
make_option('--email', action="store"),
make_option('--delete-orphaned-roles', action="store_true",
default=False),
make_option('--debug', action="store_true"),
make_option('--verbose', action="store_true"),
)
@transaction.commit_on_success
def handle(self, *args, **options):
locale.setlocale(locale.LC_ALL, '')
if options['verbose'] or options['debug']:
handler = logging.StreamHandler()
handler.setLevel(level=logging.DEBUG if options['debug'] else logging.INFO)
logger.addHandler(handler)
for key in ('url', 'prefix', 'orig', 'key', 'email'):
if not options.get(key):
raise CommandError('the --%s option must be provided' % key)
url = urlparse.urljoin(options['url'], '/roles')
url += '?' + urlencode({ 'format': 'json', 'orig': options['orig'], 'email': options['email']})
logger.debug('signing using key %r', options['key'])
url = sign_url(url, options['key'])
logger.debug('sending GET to %s', url)
response = requests.get(url)
logger.debug('got response %r', response.content)
json_response = response.json()
if json_response.get('err') == 1:
raise CommandError('Web Service error: %s' % json_response['err_class'])
prefix = options['prefix'].decode('utf-8')
if not prefix:
raise CommandError('the prefix %s is invalid' % options['prefix'])
# clean ending
prefix = re.sub(':+$', '', prefix)
default_groups = [{'text': 'Admin'},{'text': 'BackOffice'}]
all_groups = set()
for role in json_response.get('data', [])+default_groups:
role_name = role['text']
group_name = u'{0}::{1}'.format(prefix, role_name)
group, created = Group.objects.get_or_create(name=group_name)
all_groups.add(group.id)
if created:
logger.info('created role %s', group.name)
if options['delete_orphaned_roles']:
for group in Group.objects.exclude(id__in=all_groups) \
.filter(name__startswith=prefix+'::'):
logger.info('deleted orphaned role %s', group.name)
group.delete()