This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
veridic/acs/acs_administration_views.py

653 lines
27 KiB
Python

'''
VERIDIC - Towards a centralized access control system
Copyright (C) 2011 Mikael Ates
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
import logging
from django.contrib import messages
from django.conf import settings
from django.views.decorators.csrf import csrf_exempt
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.contrib.contenttypes.models import ContentType
from django.http import HttpResponseRedirect
from django.contrib.auth.models import User
from django.utils.translation import ugettext as _
from core import set_default_alias, is_in_policy, set_default_alias, \
is_admin_permission_in_policy, is_default_alias
from decorators import prevent_access_to_not_policy_root_administrators
from models import UserAlias, Role, AcsObject, View, Action, Activity, \
Namespace, AcsPermission
from forms import AddRoleForm, AddViewForm, RoleChangeForm, ViewChangeForm, \
AdminViewChangeForm
from views import return_add_any, return_list_any, return_mod_any, \
return_add_permission_form
from utils_views import get_policy_from_session, \
get_who_from_one_post_field, \
get_what_admin_from_one_post_field
logger = logging.getLogger('acs')
login_url = settings.LOGIN_URL
root_url = settings.ROOT_URL
'''
''''''
Administration of A.C.S. views
''''''
'''
@csrf_exempt
@prevent_access_to_not_policy_root_administrators
def add_admin_permission(request):
'''
Give permissions to users or admin roles to administrate entities
in policies i.e., to be able to modify or set permission on entities
in the policy.
(Does not give the right to give to give administration permission,
only root users can give administration permissions)
'''
policy = get_policy_from_session(request)
if request.method == 'POST':
logger.debug('add_admin_permission: POST received %s' %request.POST)
if not 'who_matches' in request.POST:
messages.add_message(request, messages.ERROR,
_('No object for Who provided'))
return return_add_admin_permission_form(request)
if not 'what_matches' in request.POST:
messages.add_message(request, messages.ERROR,
_('No object for What provided'))
return return_add_admin_permission_form(request)
try:
who = get_who_from_one_post_field(request, 'who_matches')
what = get_what_admin_from_one_post_field(request, 'what_matches')
how = Action.objects.get(name='administration')
except Exception, err:
logger.error('add_admin_permission: \
Fail to find an object with error %s' %err)
messages.add_message(request, messages.ERROR, err)
return return_add_admin_permission_form(request)
'''If who is a role it is an administration role in this policy'''
if isinstance(who, Role) and not who in policy.admin_roles.all():
logger.error('add_admin_permission: \
try to add a permission on the normal role %s in add \
administration permission by %s' %(who, request.user))
messages.add_message(request, messages.ERROR,
_('You cannot add a permission on a normal role here %s' \
%request.user))
return return_add_permission_form(request)
'''Here we check that who is not what'''
if ContentType.objects.get_for_model(who) \
== ContentType.objects.get_for_model(what) \
and who.id == what.id:
logger.info('add_admin_permission: \
who and what are a same target %s' %who)
messages.add_message(request, messages.ERROR,
_('Who and what are a same target %s' %who))
return return_add_admin_permission_form(request)
if isinstance(who, UserAlias) and not set_default_alias(who.user):
logger.critical('add_admin_permission: \
%s can not be initialized for administration' %who.user)
messages.add_message(request, messages.ERROR,
_('Unable to set permission'))
return return_add_admin_permission_form(request)
p = AcsPermission(who=who, what=what, how=how)
if not p:
logger.error('add_admin_permission: Fail to create permission')
messages.add_message(request, messages.ERROR,
_('Fail to create permission'))
return return_add_admin_permission_form(request)
try:
p.save()
except Exception, err:
logger.error('add_admin_permission: \
Fail to save permission with error: %s' %err)
messages.add_message(request, messages.ERROR,
_('Fail to save permission with error: %s') %err)
return return_add_admin_permission_form(request)
logger.info('add_admin_permission: Permission added: %s' %str(p))
messages.add_message(request, messages.INFO,
_('Permission added'))
return return_add_admin_permission_form(request)
@prevent_access_to_not_policy_root_administrators
def return_add_admin_permission_form(request):
'''
Only root policy administrator are able to give administration permissions
Who: All users in policy and admin roles in that policy
What: Every entity in policy
'''
policy = get_policy_from_session(request)
tpl_p = {}
ns = Namespace.objects.get(name='Default')
tpl_p['who_to_display'] = \
list(UserAlias.objects.filter(namespace=ns)) + \
list(policy.admin_roles.all())
tpl_p['what_to_display'] = tpl_p['who_to_display'] + \
list(Role.objects.filter(namespace=policy.namespace)) + \
list(AcsObject.objects.filter(namespace=policy.namespace)) + \
list(View.objects.filter(namespace=policy.namespace)) + \
list(Action.objects.filter(namespace=policy.namespace)) + \
list(Activity.objects.filter(namespace=policy.namespace)) + \
list(policy.admin_views.all())
tpl_p['backlink'] = 'mod_policy?id=' + str(policy.id)
tpl_p['title'] = _('Add an administration permission')
return render_to_response('add_admin_permission.html', tpl_p,
context_instance=RequestContext(request))
@csrf_exempt
@prevent_access_to_not_policy_root_administrators
def add_admin_role(request):
policy = get_policy_from_session(request)
if request.method == 'POST':
if 'cancel' in request.POST:
messages.add_message(request, messages.INFO,
_('Operation canceled'))
return HttpResponseRedirect('mod_policy?id=' + str(policy.id))
form = AddRoleForm(request.POST)
if form.is_valid():
role = form.save()
logger.debug('add_admin_role: admin role %s created' %role)
policy.admin_roles.add(role)
logger.debug('add_admin_role: role added to %s' \
%policy.admin_roles)
policy.admin_view.roles.add(role)
logger.debug('add_admin_role: role added to %s' \
%policy.admin_view)
messages.add_message(request, messages.INFO,
_('Administration role %s added') %role)
return HttpResponseRedirect('mod_policy?id=' + str(policy.id))
else:
form = AddRoleForm()
title = _('Add a new administration role in %s' %policy)
return return_add_any(request, form, title)
@csrf_exempt
@prevent_access_to_not_policy_root_administrators
def add_admin_view(request):
policy = get_policy_from_session(request)
if request.method == 'POST':
if 'cancel' in request.POST:
messages.add_message(request, messages.INFO,
_('Operation canceled'))
return HttpResponseRedirect('mod_policy?id=' + str(policy.id))
form = AddViewForm(request.POST)
if form.is_valid():
view = form.save()
logger.debug('add_admin_view: admin view %s created' %view)
policy.admin_views.add(view)
logger.debug('add_admin_role: view added to %s' \
%policy.admin_views)
policy.admin_view.views.add(view)
logger.debug('add_admin_view: view added to %s' \
%policy.admin_view)
messages.add_message(request, messages.INFO,
_('Administration view %s added') %view)
return HttpResponseRedirect('mod_policy?id=' + str(policy.id))
else:
form = AddViewForm()
title = _('Add a new administration view in %s' %policy)
return return_add_any(request, form, title)
@csrf_exempt
@prevent_access_to_not_policy_root_administrators
def list_admin_roles(request):
policy = get_policy_from_session(request)
title = _('Modify or delete an administration role in %s' %policy)
type_entity = 'admin_role'
backlink = 'mod_policy?id=' + str(policy.id)
back_url = 'list_admin_roles'
return return_list_any(request, policy.admin_roles.all(), title,
type_entity, backlink=backlink, back_url=back_url)
@csrf_exempt
@prevent_access_to_not_policy_root_administrators
def list_admin_views(request):
policy = get_policy_from_session(request)
title = _('Modify or delete an administration view in %s' %policy)
type_entity = 'admin_view'
backlink = 'mod_policy?id=' + str(policy.id)
back_url = 'list_admin_views'
return return_list_any(request, policy.admin_views.all(), title,
type_entity, backlink=backlink, back_url=back_url)
@csrf_exempt
@prevent_access_to_not_policy_root_administrators
def list_admin_permissions(request):
'''
Permissions containing as action 'administration' and who and what
are in policy or are admin roles or views (then are in default)
Only accessible by root policy administrators.
'''
policy = get_policy_from_session(request)
administration = Action.objects.get(name='administration')
permissions = AcsPermission.objects.filter(content_type_how__pk=\
ContentType.objects.get_for_model(administration).id,
object_id_how=administration.id)
logger.debug('list_admin_permissions: all admin permissions %s' \
% permissions)
list_any = []
for p in permissions:
if is_admin_permission_in_policy(p, policy):
list_any.append(p)
title = _('Delete an administration permission')
type_entity = 'admin_permission'
backlink = 'mod_policy?id=' + str(policy.id)
back_url = 'list_admin_permissions'
return return_list_any(request, list_any, title, type_entity,
template_name='list_permissions.html',
backlink=backlink, back_url=back_url)
@csrf_exempt
@prevent_access_to_not_policy_root_administrators
def mod_admin_role(request):
'''
We deal with User to be able to give permissions to users not yet
provided with a default alias
We add in roles the default aliases
'''
policy = get_policy_from_session(request)
role = None
form = None
if request.method == 'GET':
if 'admin_role' in request.GET and request.GET['admin_role']:
try:
role = Role.objects.get(id=request.GET['admin_role'])
except:
messages.add_message(request, messages.ERROR,
_('Unknown administration role'))
return HttpResponseRedirect('/list_admin_roles')
if not role in policy.admin_roles.all():
messages.add_message(request, messages.ERROR,
_('%s is not an administration role of %s') %(role, policy))
return HttpResponseRedirect('/list_admin_roles')
form = RoleChangeForm(instance=role)
form.fields["users"].queryset = User.objects.all()
form.fields["roles"].queryset = policy.admin_roles.all()
else:
messages.add_message(request, messages.ERROR,
_('Unknown administration role'))
return HttpResponseRedirect('/list_admin_roles')
elif request.method == 'POST':
if 'role' in request.POST and request.POST['role']:
try:
role = Role.objects.get(id=request.POST['role'])
except:
messages.add_message(request, messages.ERROR,
_('Unknown administration role'))
return HttpResponseRedirect('/list_admin_roles')
else:
messages.add_message(request, messages.ERROR,
_('Unknown administration role'))
return HttpResponseRedirect('/list_admin_roles')
if not role in policy.admin_roles.all():
messages.add_message(request, messages.ERROR,
_('%s is not an administration role of %s') %(role, policy))
return HttpResponseRedirect('/list_admin_roles')
form = RoleChangeForm(request.POST, instance=role)
form.fields["users"].queryset = User.objects.all()
form.fields["roles"].queryset = policy.admin_roles.all()
if form.is_valid():
'''Processing users modifications'''
users_registered = []
users_new = []
#users_removed = []
#users_added = []
for user in role.users.all():
if is_default_alias(user):
users_registered.append(user)
for name, value in request.POST.iterlists():
if name == 'users':
for v in value:
user = User.objects.get(id=v)
if set_default_alias(user):
users_new.append(set_default_alias(user))
#users_removed = list(set(users_registered)-set(users_new))
#users_added = list(set(users_new)-set(users_registered))
form.cleaned_data['users'] = users_new
'''Processing roles modifications'''
roles_registered = []
roles_new = []
#roles_removed = []
#roles_added = []
for r in role.roles.all():
if r in policy.admin_roles.all():
roles_registered.append(r)
for name, value in request.POST.iterlists():
if name == 'roles':
for v in value:
if v != role.name:
r = Role.objects.get(id=v)
if r in policy.admin_roles.all():
roles_new.append(r)
#roles_removed = list(set(roles_registered)-set(roles_new))
#roles_added = list(set(roles_new)-set(roles_registered))
'''Modification authorized, save role'''
form.save()
messages.add_message(request, messages.INFO,
_('Role %s modified') %role)
else:
messages.add_message(request, messages.ERROR,
_('Unknown HTTP method %s') %request.method)
return HttpResponseRedirect('/list_admin_roles')
return return_mod_admin_role(request, role, form)
@prevent_access_to_not_policy_root_administrators
def return_mod_admin_role(request, role, form):
title = _('Modify an administration role')
tpl_p = display_for_mod_admin_role(request, role)
return return_mod_any(request, form, title, tpl_p, 'mod_role.html')
@prevent_access_to_not_policy_root_administrators
def display_for_mod_admin_role(request, role=None):
policy = get_policy_from_session(request)
tpl_p = {}
users_to_display = User.objects.all()
tpl_p['users_to_display'] = []
for u in users_to_display:
if set_default_alias(u) in role.users.all():
tpl_p['users_to_display'].append((u, 'checked'))
else:
tpl_p['users_to_display'].append((u, ''))
roles_to_display = policy.admin_roles.all()
tpl_p['roles_to_display'] = []
for r in roles_to_display:
if r.id != role.id:
if r in role.roles.all():
tpl_p['roles_to_display'].append((r, 'checked'))
else:
tpl_p['roles_to_display'].append((r, ''))
tpl_p['role'] = role
tpl_p['backlink'] = '/list_admin_roles'
return tpl_p
@csrf_exempt
@prevent_access_to_not_policy_root_administrators
def mod_admin_view(request):
policy = get_policy_from_session(request)
view = None
form = None
if request.method == 'GET':
if 'admin_view' in request.GET and request.GET['admin_view']:
try:
view = View.objects.get(id=request.GET['admin_view'])
except:
messages.add_message(request, messages.ERROR,
_('Unknown administration view'))
return HttpResponseRedirect('/list_admin_views')
if not view in policy.admin_views.all():
messages.add_message(request, messages.ERROR,
_('%s is not an administration view of %s') %(view, policy))
return HttpResponseRedirect('/list_admin_roles')
form = AdminViewChangeForm(instance=view)
form.fields["users"].queryset = \
UserAlias.objects.filter(namespace=policy.namespace)
form.fields["roles"].queryset = \
Role.objects.filter(namespace=policy.namespace)
form.fields["acs_objects"].queryset = \
AcsObject.objects.filter(namespace=policy.namespace)
form.fields["views"].queryset = View.objects.all()
form.fields["actions"].queryset = \
Action.objects.filter(namespace=policy.namespace)
form.fields["activities"].queryset = \
Activity.objects.filter(namespace=policy.namespace)
else:
messages.add_message(request, messages.ERROR,
_('Unknown administration view'))
return HttpResponseRedirect('/list_admin_views')
elif request.method == 'POST':
if 'view' in request.POST and request.POST['view']:
try:
view = View.objects.get(id=request.POST['view'])
except:
messages.add_message(request, messages.ERROR,
_('Unknown administration view'))
return HttpResponseRedirect('/list_admin_views')
else:
messages.add_message(request, messages.ERROR,
_('Unknown administration view'))
return HttpResponseRedirect('/list_admin_views')
if not view in policy.admin_views.all():
messages.add_message(request, messages.ERROR,
_('%s is not an administration view of %s') %(view, policy))
return HttpResponseRedirect('/list_admin_roles')
form = AdminViewChangeForm(request.POST, instance=view)
form.fields["users"].queryset = \
UserAlias.objects.filter(namespace=policy.namespace)
form.fields["roles"].queryset = \
Role.objects.filter(namespace=policy.namespace)
form.fields["views"].queryset = View.objects.all()
form.fields["actions"].queryset = \
Action.objects.filter(namespace=policy.namespace)
form.fields["activities"].queryset = \
Activity.objects.filter(namespace=policy.namespace)
if form.is_valid():
'''Processing users modifications'''
users_registered = []
users_new = []
#users_removed = []
#users_added = []
for user in view.users.all():
if is_in_policy(user, policy):
users_registered.append(user)
for name, value in request.POST.iterlists():
if name == 'users':
for v in value:
user = UserAlias.objects.get(id=v)
if is_in_policy(user, policy):
users_new.append(user)
#users_removed = list(set(users_registered)-set(users_new))
#users_added = list(set(users_new)-set(users_registered))
'''Processing roles modifications'''
roles_registered = []
roles_new = []
#roles_removed = []
#roles_added = []
for role in view.roles.all():
if is_in_policy(role, policy):
roles_registered.append(role)
for name, value in request.POST.iterlists():
if name == 'roles':
for v in value:
role = Role.objects.get(id=v)
if is_in_policy(role, policy):
roles_new.append(role)
#roles_removed = list(set(roles_registered)-set(roles_new))
#roles_added = list(set(roles_new)-set(roles_registered))
'''Processing objects modifications'''
objects_registered = []
objects_new = []
#objects_removed = []
#objects_added = []
for object in view.acs_objects.all():
if is_in_policy(object, policy):
objects_registered.append(object)
for name, value in request.POST.iterlists():
if name == 'acs_objects':
for v in value:
object = AcsObject.objects.get(id=v)
if is_in_policy(object, policy):
objects_new.append(object)
#objects_removed = list(set(objects_registered)-set(objects_new))
#objects_added = list(set(objects_new)-set(objects_registered))
'''Processing views modifications'''
views_registered = []
views_new = []
#views_removed = []
#views_added = []
for r in view.views.all():
if is_in_policy(r, policy) or r in policy.admin_views.all():
views_registered.append(r)
for name, value in request.POST.iterlists():
if name == 'views':
for v in value:
if v != view.name:
r = View.objects.get(id=v)
if is_in_policy(r, policy) \
or r in policy.admin_views.all():
views_new.append(r)
#views_removed = list(set(views_registered)-set(views_new))
#views_added = list(set(views_new)-set(views_registered))
'''Processing actions modifications'''
actions_registered = []
actions_new = []
#actions_removed = []
#actions_added = []
for action in view.actions.all():
if is_in_policy(action, policy):
actions_registered.append(action)
for name, value in request.POST.iterlists():
if name == 'actions':
for v in value:
action = Action.objects.get(id=v)
if is_in_policy(action, policy):
actions_new.append(action)
#actions_removed = list(set(actions_registered)-set(actions_new))
#actions_added = list(set(actions_new)-set(actions_registered))
'''Processing activities modifications'''
activities_registered = []
activities_new = []
#activities_removed = []
#activities_added = []
for activity in view.activities.all():
if is_in_policy(activity, policy):
activities_registered.append(activity)
for name, value in request.POST.iterlists():
if name == 'activities':
for v in value:
activity = Activity.objects.get(id=v)
if is_in_policy(activity, policy):
activities_new.append(activity)
#activities_removed = \
# list(set(activities_registered)-set(activities_new))
#activities_added = \
# list(set(activities_new)-set(activities_registered))
'''Modification authorized, save view'''
form.save()
messages.add_message(request, messages.INFO,
_('View %s modified') %view)
else:
messages.add_message(request, messages.ERROR,
_('Unknown HTTP method %s') %request.method)
return HttpResponseRedirect('/list_admin_views')
return return_mod_admin_view(request, view, form)
@prevent_access_to_not_policy_root_administrators
def return_mod_admin_view(request, view, form):
title = _('Modify an administration view')
tpl_p = display_for_mod_admin_view(request, view)
return return_mod_any(request, form, title, tpl_p, 'mod_admin_view.html')
@prevent_access_to_not_policy_root_administrators
def display_for_mod_admin_view(request, view):
policy = get_policy_from_session(request)
tpl_p = {}
users_to_display = UserAlias.objects.filter(namespace=policy.namespace)
roles_to_display = Role.objects.filter(namespace=policy.namespace)
objects_to_display = AcsObject.objects.filter(namespace=policy.namespace)
views_to_display = \
list(View.objects.filter(namespace=policy.namespace)) \
+ list(policy.admin_views.all())
actions_to_display = Action.objects.filter(namespace=policy.namespace)
activities_to_display = Activity.objects.\
filter(namespace=policy.namespace)
tpl_p['users_to_display'] = []
for u in users_to_display:
if u in view.users.all():
tpl_p['users_to_display'].append((u, 'checked'))
else:
tpl_p['users_to_display'].append((u, ''))
tpl_p['roles_to_display'] = []
for r in roles_to_display:
if r in view.roles.all():
tpl_p['roles_to_display'].append((r, 'checked'))
else:
tpl_p['roles_to_display'].append((r, ''))
tpl_p['objects_to_display'] = []
for u in objects_to_display:
if u in view.acs_objects.all():
tpl_p['objects_to_display'].append((u, 'checked'))
else:
tpl_p['objects_to_display'].append((u, ''))
tpl_p['views_to_display'] = []
for r in views_to_display:
if r.id != view.id:
if r in view.views.all():
tpl_p['views_to_display'].append((r, 'checked'))
else:
tpl_p['views_to_display'].append((r, ''))
tpl_p['actions_to_display'] = []
for u in actions_to_display:
if u in view.actions.all():
tpl_p['actions_to_display'].append((u, 'checked'))
else:
tpl_p['actions_to_display'].append((u, ''))
tpl_p['activities_to_display'] = []
for r in activities_to_display:
if r in view.activities.all():
tpl_p['activities_to_display'].append((r, 'checked'))
else:
tpl_p['activities_to_display'].append((r, ''))
tpl_p['view'] = view
tpl_p['backlink'] = '/list_admin_views'
return tpl_p