This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
veridic/acs/alias_mgmt_views.py

533 lines
21 KiB
Python

'''
VERIDIC - Towards a centralized access control system
Copyright (C) 2011 Mikael Ates
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
import logging
from django.contrib import messages
from django.utils.translation import ugettext as _
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.contrib.auth.models import User
from django.views.decorators.csrf import csrf_exempt
from django.http import HttpResponseRedirect
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from django.conf import settings
from core import get_alias_in_policy, \
is_policy_user_administrator, \
is_user_administrator, \
is_policy_root_administrator, \
get_aliases_of_user_without_default, get_policy_from_namespace, \
synchronize_users_in_policy, is_self_admin, switch_self_admin_by_id, \
all_users_self_admin_in_policy
from models import UserAlias, Namespace, Policy
from abac.models import Source, LdapSource
from decorators import prevent_access_to_not_user_administrators, \
check_policy_in_session, \
check_authorized_on_users_and_roles
from utils_views import get_policy_from_session
logger = logging.getLogger('acs')
login_url = settings.LOGIN_URL
root_url = settings.ROOT_URL
'''
''''''
Alias Management
''''''
'''
@csrf_exempt
@prevent_access_to_not_user_administrators
def list_users_for_aliases(request):
if request.method == 'POST' and \
('Synchronize' in request.POST or \
'All_self_admin' in request.POST) and \
'policy' in request.POST:
policy = None
try:
policy = Policy.objects.get(id=request.POST['policy'])
except:
messages.add_message(request, messages.ERROR,
_('Unknown policy'))
else:
if not is_policy_user_administrator(request.user, policy):
messages.add_message(request, messages.ERROR,
_('You are not authorized to access %s') %policy)
return HttpResponseRedirect(root_url)
if 'Synchronize' in request.POST:
r = synchronize_users_in_policy(request.user, policy)
if r == 0:
messages.add_message(request, messages.INFO,
_('Users successfully synchronized in %s') %policy)
else:
messages.add_message(request, messages.ERROR,
_('Unable to synchronize all users in %s') %policy)
else:
if all_users_self_admin_in_policy(request.user, policy):
messages.add_message(request, messages.INFO,
_('All users declared in %s successfully set \
self administrators') %policy)
else:
messages.add_message(request, messages.ERROR,
_('Unable to set all users declared in %s \
self administrators') %policy)
'''We present all users'''
users = User.objects.all()
'''We advertise the user about the policy it can adminstrate'''
policies = Policy.objects.all()
plist = []
for p in policies:
if is_user_administrator(request.user) \
or is_policy_root_administrator(request.user, p):
plist.append(p)
if not users:
messages.add_message(request, messages.ERROR,
_('There is no user you can administrate.'))
return HttpResponseRedirect(root_url)
if not plist:
messages.add_message(request, messages.ERROR,
_('There is no policy you can administrate.'))
return HttpResponseRedirect(root_url)
tpl_parameters = \
{'list_any': users,
'authz_policies': plist,
'title': \
'User aliases administration and self-administration enabling',
'type_entity': 'user',
'backlink': root_url}
return render_to_response('list_users_for_aliases.html', tpl_parameters,
context_instance=RequestContext(request))
@csrf_exempt
@check_policy_in_session
@check_authorized_on_users_and_roles
def all_users_self_admin(request):
policy = get_policy_from_session(request)
if all_users_self_admin_in_policy(request.user, policy):
messages.add_message(request, messages.INFO,
_('All users declared in %s successfully set \
self administrators') %policy)
else:
messages.add_message(request, messages.ERROR,
_('Unable to set all users declared in %s \
self administrators') %policy)
return HttpResponseRedirect('mod_policy?id=' + str(policy.id))
@csrf_exempt
@check_policy_in_session
@check_authorized_on_users_and_roles
def list_aliases_in_policy(request):
'''From policy'''
policy = get_policy_from_session(request)
aliases = UserAlias.objects.filter(namespace=policy.namespace)
list_aliases = []
for a in aliases:
if is_self_admin(a):
list_aliases.append((a, True))
else:
list_aliases.append((a, False))
users_not_in_policy = []
for user in User.objects.all():
alias = get_alias_in_policy(user, policy)
if not alias:
users_not_in_policy.append(user)
tpl_parameters = {}
tpl_parameters['aliases'] = list_aliases
tpl_parameters['users_not_in_policy'] = users_not_in_policy
tpl_parameters['title'] = _('Modify or delete a user alias in %s') %policy
tpl_parameters['type_entity'] = 'alias'
tpl_parameters['back_url'] = 'list_aliases_in_policy'
tpl_parameters['backlink'] = 'mod_policy?id=' + str(policy.id)
return render_to_response('list_aliases.html', tpl_parameters,
context_instance=RequestContext(request))
@csrf_exempt
@check_policy_in_session
@check_authorized_on_users_and_roles
def add_alias_only(request):
policy = get_policy_from_session(request)
tpl_parameters = {}
if request.method == 'POST':
if 'cancel' in request.POST:
messages.add_message(request, messages.INFO,
_('Operation canceled'))
return HttpResponseRedirect('mod_policy?id=' + str(policy.id))
if 'alias' in request.POST and request.POST['alias']:
try:
UserAlias.objects.get(alias=request.POST['alias'],
namespace=policy.namespace)
messages.add_message(request, messages.ERROR,
_('Alias already exists.'))
except ObjectDoesNotExist:
try:
ua = UserAlias(alias=request.POST['alias'],
namespace=policy.namespace)
ua.save()
messages.add_message(request, messages.INFO,
_('Alias created.'))
return HttpResponseRedirect('mod_policy?id=' \
+ str(policy.id))
except Exception, err:
messages.add_message(request, messages.ERROR,
_('Error processing alias creation due to: %s.' \
%err))
else:
messages.add_message(request, messages.ERROR,
_('Blank field. Alias not created.'))
tpl_parameters = {'title': _('Add a new alias only in %s' %policy)}
return render_to_response('add_alias_only.html', tpl_parameters,
context_instance=RequestContext(request))
@csrf_exempt
@check_policy_in_session
@check_authorized_on_users_and_roles
def set_user_in_policy(request):
'''The admin must hav admin right on this user'''
policy = get_policy_from_session(request)
if request.method == 'POST':
if not 'id' in request.POST:
logger.error('set_user_in_namespace: no user id provided by %s' \
%request.user)
messages.add_message(request, messages.ERROR, _('Invalid action'))
return HttpResponseRedirect('list_aliases_in_policy')
try:
user = User.objects.get(id=request.POST['id'])
except:
logger.error('set_user_in_namespace: unknown user with id %s' \
%request.POST['id'])
messages.add_message(request, messages.ERROR, _('Unkown user'))
return HttpResponseRedirect('mod_policy?id=' + str(policy.id))
try:
a = UserAlias.objects.get(user=user, namespace=policy.namespace)
logger.error('set_user_in_namespace: \
the user already has the alias %s in this namespace' %a.alias)
messages.add_message(request, messages.ERROR,
_('The user already has the alias %s in this namespace')
%a.alias)
return HttpResponseRedirect('list_aliases_in_policy')
except ObjectDoesNotExist:
pass
except MultipleObjectsReturned:
logger.critical('set_user_in_namespace: \
the user has mulitple aliases in this namespace')
messages.add_message(request, messages.ERROR,
_('The user already has multiple aliases in this namespace'))
return HttpResponseRedirect('list_aliases_in_policy')
if not 'alias' in request.POST or not request.POST['alias']:
alias = user.username
else:
alias = request.POST['alias']
try:
ua = UserAlias(alias=alias, user=user, namespace=policy.namespace)
ua.save()
messages.add_message(request, messages.INFO,
_('User %s added in %s') %(ua, policy))
return HttpResponseRedirect('list_aliases_in_policy')
except:
messages.add_message(request, messages.ERROR,
_('Unable to add %s for %s in %s') %(alias, user, policy))
return HttpResponseRedirect('list_aliases_in_policy')
return HttpResponseRedirect('list_aliases_in_policy')
@csrf_exempt
@prevent_access_to_not_user_administrators
def list_aliases(request, pk=None):
'''From homepage'''
if request.method == 'GET' or pk:
if 'id' in request.GET or pk:
if 'id' in request.GET:
pk = request.GET['id']
user = None
try:
user = User.objects.get(id=pk)
except:
messages.add_message(request, messages.ERROR,
_('Unknown user with id %s') %request.GET['id'])
return HttpResponseRedirect('/list_users_for_aliases')
sources = None
try:
sources = Source.objects.all()
except Exception, err:
logger.error('list_aliases: An error occurred looking for \
sources: %s' % err)
logger.debug('list_aliases: sources found %s' % sources)
source_names = [source.name for source in sources]
'''
Aliases in policies
'''
aliases = get_aliases_of_user_without_default(user)
namespaces_in = []
namespaces_all = []
namespaces_not_in = []
list_any = []
for a in aliases:
if a.namespace.name not in source_names:
namespaces_in.append(a.namespace)
if is_self_admin(a):
list_any.append((a, True))
else:
list_any.append((a, False))
policies = Policy.objects.all()
for p in policies:
if is_user_administrator(request.user) \
or is_policy_root_administrator(request.user, p):
namespaces_all.append(p.namespace)
namespaces_not_in = list(set(namespaces_all) - set(namespaces_in))
'''
Aliases in sources
'''
aliases_sources = []
for source in sources:
if not (isinstance(source.get_source_instance(), LdapSource) \
and source.get_source_instance().is_auth_backend):
ns = None
try:
ns = Namespace.objects.get(name=source.name)
except Exception, err:
logger.error('list_aliases: An error occurred \
looking for the namespace of %s: %s\
sources: %s' % (source.name, err))
else:
try:
u = UserAlias.objects.get(user=user, namespace=ns)
aliases_sources.append((ns, u))
except:
aliases_sources.append((ns, None))
back_url = '/list_aliases?id=%s' %pk
tpl_parameters = {'namespaces_not_in': namespaces_not_in,
'list_any': list_any,
'title': _('Add or remove an alias of %s') %user,
'user': user,
'type_entity': 'alias',
'aliases_sources': aliases_sources,
'back_url': back_url,
'backlink': 'list_users_for_aliases'}
return render_to_response('list_aliases_and_add.html',
tpl_parameters, context_instance=RequestContext(request))
messages.add_message(request, messages.ERROR, _('Unknown action'))
return list_users_for_aliases(request)
@csrf_exempt
@prevent_access_to_not_user_administrators
def switch_self_admin(request):
if request.method != 'POST' or not 'id' in request.POST:
messages.add_message(request, messages.ERROR,
_('Unable to treat request'))
return HttpResponseRedirect(root_url)
ua = None
try:
ua = UserAlias.objects.get(id=request.POST['id'])
except:
messages.add_message(request, messages.ERROR,
_('Unable to find the user Alias'))
if switch_self_admin_by_id(request.POST['id']):
messages.add_message(request, messages.INFO,
_('%s successfully switched') %ua)
else:
messages.add_message(request, messages.ERROR,
_('Unable to switch %s') %ua)
back_url = root_url
if 'back_url' in request.POST:
back_url = request.POST['back_url']
return HttpResponseRedirect(back_url)
@csrf_exempt
@check_authorized_on_users_and_roles
def set_user_in_policy_from_home(request):
'''The admin must hav admin right on this user'''
if request.method == 'POST':
if not 'alias' in request.POST or not 'user_id' in request.POST \
or not 'namespace_id' in request.POST:
logger.error('set_user_in_namespace_from_home: \
missing data from %s' %request.user)
messages.add_message(request, messages.ERROR, _('Invalid action'))
HttpResponseRedirect('/list_users_for_aliases')
user = None
try:
user = User.objects.get(id=request.POST['user_id'])
except:
logger.error('set_user_in_namespace_from_home: \
unknown user with id %s' %request.POST['user_id'])
messages.add_message(request, messages.ERROR, _('Unkown user'))
HttpResponseRedirect('/list_users_for_aliases')
backurl = '/list_aliases?id=' + request.POST['user_id']
'''Add user to a policy means check admin right on this policy'''
ns = None
try:
ns = Namespace.objects.get(id=request.POST['namespace_id'])
except:
logger.error('set_user_in_namespace_from_home: \
unknown namespace with id %s' %request.POST['namespace_id'])
messages.add_message(request, messages.ERROR,
_('Unkown namespace'))
return HttpResponseRedirect(backurl)
policy = get_policy_from_namespace(ns)
'''Create alias'''
try:
a = UserAlias.objects.get(user=user, namespace=ns)
logger.error('set_user_in_namespace_from_home: the user %s \
already has the alias %s in this namespace' %(user, a))
messages.add_message(request, messages.ERROR,
_('The user already has the alias %s in this namespace') %a)
return HttpResponseRedirect(backurl)
except ObjectDoesNotExist:
pass
except MultipleObjectsReturned:
logger.critical('set_user_in_namespace_from_home: \
the user has mulitple aliases in this namespace')
messages.add_message(request, messages.ERROR,
_('The user already has multiple aliases in this namespace'))
return HttpResponseRedirect(backurl)
if not 'alias' in request.POST or not request.POST['alias']:
alias = user.username
else:
alias = request.POST['alias']
ua = None
try:
ua = UserAlias(alias=alias, user=user, namespace=ns)
ua.save()
except:
messages.add_message(request, messages.ERROR,
_('Unable to add %s for %s in %s') %(alias, user, ns))
return HttpResponseRedirect(backurl)
'''Add alias to the admin view of the policy'''
policy.admin_view.users.add(ua)
messages.add_message(request, messages.INFO,
_('User %s added in %s') %(ua, ns))
return HttpResponseRedirect(backurl)
messages.add_message(request, messages.ERROR, _('Unknown action'))
HttpResponseRedirect('/list_users_for_aliases')
@csrf_exempt
@prevent_access_to_not_user_administrators
def set_user_in_source(request):
'''The admin must have admin right on this user'''
if request.method == 'POST':
if not 'alias' in request.POST or not 'user_id' in request.POST \
or not 'namespace_id' in request.POST:
logger.error('set_user_in_namespace_from_home: \
missing data from %s' %request.user)
messages.add_message(request, messages.ERROR, _('Invalid action'))
HttpResponseRedirect('/list_users_for_aliases')
user = None
try:
user = User.objects.get(id=request.POST['user_id'])
except:
logger.error('set_user_in_source: \
unknown user with id %s' %request.POST['user_id'])
messages.add_message(request, messages.ERROR, _('Unkown user'))
HttpResponseRedirect('/list_users_for_aliases')
backurl = '/list_aliases?id=' + request.POST['user_id']
ns = None
try:
ns = Namespace.objects.get(id=request.POST['namespace_id'])
except:
logger.error('set_user_in_source: \
unknown namespace with id %s' %request.POST['namespace_id'])
messages.add_message(request, messages.ERROR,
_('Unkown namespace'))
return HttpResponseRedirect(backurl)
'''Create alias'''
try:
a = UserAlias.objects.get(user=user, namespace=ns)
logger.error('set_user_in_source: the user %s \
already has the alias %s in this namespace' %(user, a))
messages.add_message(request, messages.ERROR,
_('The user already has the alias %s in this namespace') %a)
return HttpResponseRedirect(backurl)
except ObjectDoesNotExist:
pass
except MultipleObjectsReturned:
logger.critical('set_user_in_source: \
the user has mulitple aliases in this namespace')
messages.add_message(request, messages.ERROR,
_('The user already has multiple aliases in this namespace'))
return HttpResponseRedirect(backurl)
if not 'alias' in request.POST or not request.POST['alias']:
alias = user.username
else:
alias = request.POST['alias']
ua = None
try:
ua = UserAlias(alias=alias, user=user, namespace=ns)
ua.save()
except:
messages.add_message(request, messages.ERROR,
_('Unable to add %s for %s in %s') %(alias, user, ns))
return HttpResponseRedirect(backurl)
messages.add_message(request, messages.INFO,
_('User %s added in %s') %(ua, ns))
return HttpResponseRedirect(backurl)
messages.add_message(request, messages.ERROR, _('Unknown action'))
HttpResponseRedirect('/list_users_for_aliases')
@csrf_exempt
@check_policy_in_session
@check_authorized_on_users_and_roles
def synchronize_users_in_policy_view(request):
policy = get_policy_from_session(request)
r = synchronize_users_in_policy(request.user, policy)
if r == 0:
messages.add_message(request, messages.INFO,
_('Users successfully synchronized in %s') %policy)
else:
messages.add_message(request, messages.ERROR,
_('Unable to synchronize all users in %s') %policy)
return HttpResponseRedirect('mod_policy?id=' + str(policy.id))