1211 lines
60 KiB
Python
1211 lines
60 KiB
Python
'''
|
|
VERIDIC - Towards a centralized access control system
|
|
|
|
Copyright (C) 2011 Mikael Ates
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as
|
|
published by the Free Software Foundation, either version 3 of the
|
|
License, or (at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
'''
|
|
|
|
import logging
|
|
import re
|
|
import random
|
|
import string
|
|
|
|
from django.contrib import messages
|
|
from django.utils.translation import ugettext as _
|
|
from django.shortcuts import render_to_response
|
|
from django.template import RequestContext
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.http import HttpResponseRedirect
|
|
from django.conf import settings
|
|
from django.db import transaction
|
|
|
|
from attribute_aggregator.models import AttributeSource, AttributeData
|
|
from attribute_aggregator.xacml_constants import *
|
|
from attribute_aggregator.core import get_all_attribute_definitions, \
|
|
get_all_sources, get_attribute_type_of_definition, \
|
|
convert_from_string
|
|
from attribute_aggregator.mapping import ATTRIBUTE_MAPPING
|
|
|
|
from acs.abac.core import remove_rule
|
|
from acs.abac.logic import is_proposition
|
|
from acs.abac.models import AbacRule, AssertionData, AssertionDefinition, \
|
|
PredicateRequired, PredicateRole, PredicateComparison
|
|
|
|
from core import is_policy_action_creator, is_policy_object_creator, \
|
|
is_policy_user_administrator, is_in_policy, is_valid_regex, \
|
|
is_admin_permission_in_policy, is_authorized_by_names, \
|
|
isAuthorizedRBAC2, is_authorized_with_object_in_regexp, \
|
|
set_default_alias, \
|
|
is_policy_root_administrator, \
|
|
filter_list_in_namespace, \
|
|
return_list_users_authorized_for_admin, \
|
|
return_list_roles_authorized_for_admin, \
|
|
return_list_actions_authorized_for_admin, \
|
|
return_list_objects_authorized_for_admin, \
|
|
return_list_views_authorized_for_admin, \
|
|
return_list_activities_authorized_for_admin, \
|
|
check_abac_permission, get_alias_in_policy
|
|
|
|
from models import UserAlias, Role, AcsObject, View, Action, Activity, \
|
|
AcsAbacPermission
|
|
|
|
from decorators import prevent_access_to_normal_users,\
|
|
check_policy_in_session, \
|
|
prevent_access_to_not_user_administrators, \
|
|
check_authorized_on_users_and_roles, \
|
|
check_authorized_on_objects_and_views, \
|
|
check_authorized_on_actions_and_activities, \
|
|
prevent_access_to_not_self_administrators_or_normal_users, \
|
|
check_authorized_for_abac
|
|
|
|
from views import check_object_or_view, check_action_or_activity, \
|
|
return_add_any
|
|
|
|
from utils_views import get_policy_from_session, \
|
|
get_who_from_one_post_field, \
|
|
get_what_from_one_post_field, \
|
|
get_how_from_one_post_field
|
|
|
|
|
|
logger = logging.getLogger('acs')
|
|
|
|
login_url = settings.LOGIN_URL
|
|
root_url = settings.ROOT_URL
|
|
|
|
'''
|
|
''''''
|
|
Permission management views
|
|
''''''
|
|
'''
|
|
|
|
PREDICATE_REQUIRED = \
|
|
"urn:entrouvert:acs:constants:predicate-required"
|
|
|
|
PREDICATE_ROLE = \
|
|
"urn:entrouvert:acs:constants:predicate-role"
|
|
|
|
PREDICATE_TYPE_EXPL_DIC = \
|
|
dict({PREDICATE_REQUIRED: _('Require attribute presence'),
|
|
PREDICATE_ROLE: _('Require role')}.items() \
|
|
+ XACML_COMPARISON_TYPE_DIC.items())
|
|
|
|
PREDICATE_TYPES = (PREDICATE_REQUIRED, PREDICATE_ROLE, ) + ACS_XACML_COMPARISON \
|
|
+ XACML_COMPARISON_EQUALITY
|
|
|
|
PREDICATE_TYPES_TYPE = ((PREDICATE_REQUIRED, _('Require attribute presence')),
|
|
(PREDICATE_ROLE, _('Require role')), ) \
|
|
+ XACML_COMPARISON_DIFF_TYPE + XACML_COMPARISON_EQUALITY_TYPE
|
|
|
|
|
|
@csrf_exempt
|
|
@check_policy_in_session
|
|
@check_authorized_for_abac
|
|
def add_abac_permission(request):
|
|
'''
|
|
Add permissions into policies
|
|
'''
|
|
|
|
if request.method == 'POST':
|
|
if 'cancel_all' in request.POST:
|
|
if 'working_predicate' in request.session:
|
|
request.session.pop('working_predicate')
|
|
if 'predicates' in request.session:
|
|
request.session.pop('predicates')
|
|
if 'rule' in request.session:
|
|
request.session.pop('rule')
|
|
messages.add_message(request, messages.INFO,
|
|
_('Abac rule creation cancelled.'))
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'delete_working_predicate' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate to delete'))
|
|
else:
|
|
request.session.pop('working_predicate')
|
|
messages.add_message(request, messages.INFO,
|
|
_('Working predicate removed'))
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'delete_working_operand' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'working_operand' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working operand to delete'))
|
|
else:
|
|
working_predicate = request.session['working_predicate']
|
|
request.session.pop('working_predicate')
|
|
working_predicate.pop('working_operand')
|
|
request.session['working_predicate'] = working_predicate
|
|
messages.add_message(request, messages.INFO,
|
|
_('Working operand removed'))
|
|
return return_add_abac_permission_form(request)
|
|
|
|
'''
|
|
Predicate choice
|
|
- creation
|
|
- validation
|
|
- deletion
|
|
'''
|
|
if 'predicate_type' in request.POST \
|
|
and 'new_predicate' in request.POST:
|
|
if 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('You must finish to complete the working prediate \
|
|
before defining a new one'))
|
|
return return_add_abac_permission_form(request)
|
|
else:
|
|
if not request.POST['predicate_type'] in PREDICATE_TYPE_EXPL_DIC:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Unknown predicate type %s') \
|
|
% request.POST['predicate_type'])
|
|
else:
|
|
request.session['working_predicate'] = {}
|
|
request.session['working_predicate']['type'] = \
|
|
request.POST['predicate_type']
|
|
request.session['working_predicate']['type_friendly'] = \
|
|
PREDICATE_TYPE_EXPL_DIC[(request.POST['predicate_type'])]
|
|
request.session['working_predicate']['working_operand'] = {}
|
|
request.session['working_predicate']['working_operand']['type'] = 'definition'
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'close_working_predicate' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
else:
|
|
predicates = []
|
|
if 'predicates' in request.session:
|
|
predicates = request.session['predicates']
|
|
request.session.pop('predicates')
|
|
predicates.append(request.session['working_predicate'])
|
|
request.session.pop('working_predicate')
|
|
request.session['predicates'] = predicates
|
|
messages.add_message(request, messages.INFO,
|
|
_('Predicate recorded'))
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'delete_predicate' in request.POST \
|
|
and 'predicate_id' in request.POST:
|
|
if not 'predicates' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No predicate to delete'))
|
|
elif int(request.POST['predicate_id']) > len(request.session['predicates']):
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Unknown predicate'))
|
|
else:
|
|
predicates = request.session['predicates']
|
|
predicates.pop(int(request.POST['predicate_id'])-1)
|
|
request.session.pop('predicates')
|
|
if not predicates:
|
|
if 'rule' in request.session:
|
|
request.session.pop('rule')
|
|
else:
|
|
request.session['predicates'] = predicates
|
|
messages.add_message(request, messages.INFO,
|
|
_('Predicate removed'))
|
|
return return_add_abac_permission_form(request)
|
|
|
|
|
|
'''
|
|
Predicate required:
|
|
- select definition and require single-valued
|
|
- select source
|
|
'''
|
|
if 'select_attribute_definition' in request.POST \
|
|
and 'attribute_definition' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] != \
|
|
PREDICATE_REQUIRED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required attribute'))
|
|
elif not request.POST['attribute_definition'] in ATTRIBUTE_MAPPING:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Unknown attribute definition'))
|
|
else:
|
|
working_predicate = request.session['working_predicate']
|
|
request.session.pop('working_predicate')
|
|
working_predicate['definition_name'] \
|
|
= request.POST['attribute_definition']
|
|
if 'singlevalued' in request.POST:
|
|
working_predicate['singlevalued'] = 'singlevalued'
|
|
request.session['working_predicate'] = working_predicate
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'select_source' in request.POST \
|
|
and 'source_id' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] != \
|
|
PREDICATE_REQUIRED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required attribute'))
|
|
else:
|
|
try:
|
|
s = AttributeSource.objects.get(\
|
|
id=request.POST['source_id'])
|
|
working_predicate = request.session['working_predicate']
|
|
if 'sources_selected' in working_predicate:
|
|
if 'singlevalued' in working_predicate:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Attribute must be single-valued, a unique source can be provided, source %s has already be given') \
|
|
%working_predicate['sources_selected'][0][1])
|
|
return return_add_abac_permission_form(request)
|
|
if (s.id, s.name) in working_predicate['sources_selected']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Source already added'))
|
|
return return_add_abac_permission_form(request)
|
|
else:
|
|
working_predicate['sources_selected']= []
|
|
request.session.pop('working_predicate')
|
|
working_predicate['sources_selected'].append((s.id, s.name))
|
|
request.session['working_predicate'] = working_predicate
|
|
except:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Source not found'))
|
|
return return_add_abac_permission_form(request)
|
|
|
|
|
|
'''
|
|
Predicate role:
|
|
- select role
|
|
'''
|
|
if 'select_role' in request.POST \
|
|
and 'role_id' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] != \
|
|
PREDICATE_ROLE:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required role'))
|
|
else:
|
|
try:
|
|
role = Role.objects.get(id=request.POST['role_id'])
|
|
working_predicate = request.session['working_predicate']
|
|
request.session.pop('working_predicate')
|
|
working_predicate['role'] \
|
|
= role
|
|
request.session['working_predicate'] = working_predicate
|
|
except:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Role not found'))
|
|
return return_add_abac_permission_form(request)
|
|
|
|
|
|
'''
|
|
Predicate comparison:
|
|
- indicate single-valued operand
|
|
- as a consequence, choice multivalues management option
|
|
- define operands
|
|
'''
|
|
if 'select_multivalue_step_one' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] == \
|
|
PREDICATE_REQUIRED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required attribute'))
|
|
else:
|
|
working_predicate = request.session['working_predicate']
|
|
request.session.pop('working_predicate')
|
|
working_predicate['multivalues_step_one'] = 'multivalues_step_one'
|
|
if 'operandone_singlevalued' in request.POST:
|
|
working_predicate['operandone_singlevalued'] = 'singlevalued'
|
|
if 'operandtwo_singlevalued' in request.POST:
|
|
working_predicate['operandtwo_singlevalued'] = 'singlevalued'
|
|
if 'operandone_singlevalued' in request.POST \
|
|
and 'operandtwo_singlevalued' in request.POST:
|
|
working_predicate['multivalues_step_two'] = 'NO_MULTIVALUES'
|
|
if ('operandone_singlevalued' in request.POST \
|
|
or 'operandtwo_singlevalued' in request.POST) \
|
|
and working_predicate['type'] \
|
|
in XACML_COMPARISON_EQUALITY:
|
|
working_predicate['multivalues_step_two'] = 'EQUAL_ONE_VALUE'
|
|
working_predicate['multivalues_explanation'] = \
|
|
_('One of the two operand might be multivalued, then the comparison will search that at least one value of multi values is equal to the value of the other attribute.')
|
|
|
|
request.session['working_predicate'] = working_predicate
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'select_multivalue_step_two' in request.POST \
|
|
and 'multivalues' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] == \
|
|
PREDICATE_REQUIRED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required attribute'))
|
|
elif not 'multivalues_step_one' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('the first mutltivalue configuration step has not yet been completed'))
|
|
elif not request.POST['multivalues'] in \
|
|
('EQUAL_ONE_VALUE', 'EQUAL_OP1_SUBSET_OP2',
|
|
'EQUAL_EXACT_MATCH',
|
|
'DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2',
|
|
'DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2',
|
|
'DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2',
|
|
'DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2'):
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Unknown multivalues treatment option'))
|
|
else:
|
|
working_predicate = request.session['working_predicate']
|
|
request.session.pop('working_predicate')
|
|
working_predicate['multivalues_step_two'] = \
|
|
request.POST['multivalues']
|
|
|
|
|
|
s = None
|
|
if working_predicate['type'] in ACS_XACML_COMPARISON_LT:
|
|
s = 'less than'
|
|
elif working_predicate['type'] in ACS_XACML_COMPARISON_LT_OE:
|
|
s = 'less than or equal to'
|
|
elif working_predicate['type'] in ACS_XACML_COMPARISON_GRT:
|
|
s = 'greater than'
|
|
elif working_predicate['type'] in ACS_XACML_COMPARISON_GRT_OE:
|
|
s = 'greater than or equal to'
|
|
if working_predicate['multivalues_step_two'] \
|
|
== 'DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2':
|
|
working_predicate['multivalues_explanation'] = \
|
|
_('All values of operand one must be %s the highest value of operand two.') %s
|
|
elif working_predicate['multivalues_step_two'] \
|
|
== 'DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2':
|
|
working_predicate['multivalues_explanation'] = \
|
|
_('All values of operand one must be %s the smallest value of operand two.') %s
|
|
elif working_predicate['multivalues_step_two'] \
|
|
== 'DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2':
|
|
working_predicate['multivalues_explanation'] = \
|
|
_('At least one value of operand one must be %s the highest value of operand two.') %s
|
|
elif working_predicate['multivalues_step_two'] \
|
|
== 'DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2':
|
|
working_predicate['multivalues_explanation'] = \
|
|
_('At least one value of operand one must be %s the smallest value of operand two.') %s
|
|
elif working_predicate['multivalues_step_two'] \
|
|
== 'EQUAL_OP1_SUBSET_OP2':
|
|
working_predicate['multivalues_explanation'] = \
|
|
_('The first attribute may have multiple values and each must be equal to a value of the second attribute (subset).')
|
|
elif working_predicate['multivalues_step_two'] \
|
|
== 'EQUAL_EXACT_MATCH':
|
|
working_predicate['multivalues_explanation'] = \
|
|
_('The first attribute may have multiple values that exactly match the values of the second attribute.')
|
|
elif working_predicate['multivalues_step_two'] \
|
|
== 'EQUAL_ONE_VALUE':
|
|
working_predicate['multivalues_explanation'] = \
|
|
_('The first attribute may have multiple values and at least one must be equal to a value of the second attribute.')
|
|
|
|
logger.debug('add_abac_permission: predicate %s' \
|
|
% working_predicate['type'])
|
|
logger.debug('add_abac_permission: multivalues %s' \
|
|
% working_predicate['multivalues_step_two'])
|
|
logger.debug('add_abac_permission: explanation %s' \
|
|
% working_predicate['multivalues_explanation'])
|
|
|
|
|
|
request.session['working_predicate'] = working_predicate
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'operand_is_definition' in request.POST \
|
|
or 'operand_is_definition_data' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] == \
|
|
PREDICATE_REQUIRED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required attribute'))
|
|
elif 'operand1_defined' in request.session['working_predicate'] \
|
|
and 'operand2_defined' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Both operands already defined'))
|
|
elif 'working_operand' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('There is already an operand definition in progress'))
|
|
else:
|
|
working_predicate = request.session['working_predicate']
|
|
request.session.pop('working_predicate')
|
|
working_predicate['working_operand'] = {}
|
|
if 'operand_is_definition' in request.POST:
|
|
working_predicate['working_operand']['type'] = 'definition'
|
|
else:
|
|
working_predicate['working_operand']['type'] = 'value'
|
|
# Definition type is the same as operand one
|
|
working_predicate['working_operand']['definition_name'] = working_predicate['operand1_defined']['definition_name']
|
|
request.session['working_predicate'] = working_predicate
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'select_attribute_definition_operand' in request.POST \
|
|
and 'attribute_definition' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] == \
|
|
PREDICATE_REQUIRED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required attribute'))
|
|
elif 'operand1_defined' in request.session['working_predicate'] \
|
|
and 'operand2_defined' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Both operands already defined'))
|
|
elif not 'working_operand' in request.session['working_predicate'] \
|
|
or not 'type' in request.session['working_predicate']['working_operand']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('There is no valid operand definition in progress'))
|
|
elif not request.POST['attribute_definition'] in ATTRIBUTE_MAPPING:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Unknown attribute definition'))
|
|
else:
|
|
definition = request.POST['attribute_definition']
|
|
if get_attribute_type_of_definition(definition) \
|
|
!= ACS_COMP_TYPE[\
|
|
request.session['working_predicate']['type']]:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('The attribute type does not match the comparison type'))
|
|
return return_add_abac_permission_form(request)
|
|
working_predicate = request.session['working_predicate']
|
|
request.session.pop('working_predicate')
|
|
working_predicate['working_operand']['definition_name'] \
|
|
= definition
|
|
request.session['working_predicate'] = working_predicate
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'select_source_operand' in request.POST \
|
|
and 'source_operand_id' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] == \
|
|
PREDICATE_REQUIRED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required attribute'))
|
|
elif 'operand1_defined' in request.session['working_predicate'] \
|
|
and 'operand2_defined' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Both operands already defined'))
|
|
elif not 'working_operand' in request.session['working_predicate'] \
|
|
or not 'type' in request.session['working_predicate']['working_operand']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('There is no valid operand definition in progress'))
|
|
elif not 'definition_name' in request.session['working_predicate']['working_operand']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing definition of the working operand'))
|
|
else:
|
|
try:
|
|
s = AttributeSource.objects.get(\
|
|
id=request.POST['source_operand_id'])
|
|
working_predicate = request.session['working_predicate']
|
|
if 'sources_selected' in working_predicate['working_operand']:
|
|
if not 'operand1_defined' in request.session['working_predicate'] and 'operandone_singlevalued' in working_predicate:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Operand one is single-valued, a unique source can be provided, source %s has already be given') \
|
|
% working_predicate['working_operand']['sources_selected'][0][1])
|
|
return return_add_abac_permission_form(request)
|
|
if 'operand1_defined' in request.session['working_predicate'] and 'operandtwo_singlevalued' in working_predicate:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Operand two is single-valued, a unique source can be provided, source %s has already be given') \
|
|
% working_predicate['working_operand']['sources_selected'][0][1])
|
|
return return_add_abac_permission_form(request)
|
|
if (s.id, s.name) in working_predicate['working_operand']['sources_selected']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Source already added'))
|
|
return return_add_abac_permission_form(request)
|
|
else:
|
|
working_predicate['working_operand']['sources_selected']= []
|
|
request.session.pop('working_predicate')
|
|
working_predicate['working_operand']['sources_selected'].append((s.id, s.name))
|
|
request.session['working_predicate'] = working_predicate
|
|
except:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Source not found'))
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'value_operand' in request.POST \
|
|
and 'value_operand_submitted' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] == \
|
|
PREDICATE_REQUIRED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required attribute'))
|
|
elif 'operand1_defined' in request.session['working_predicate'] \
|
|
and 'operand2_defined' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Both operands already defined'))
|
|
elif not 'working_operand' in request.session['working_predicate'] \
|
|
or not 'type' in request.session['working_predicate']['working_operand']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('There is no valid operand definition in progress'))
|
|
elif not 'definition_name' in request.session['working_predicate']['working_operand']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing definition of the working operand'))
|
|
else:
|
|
try:
|
|
working_predicate = request.session['working_predicate']
|
|
if 'values_selected' in working_predicate['working_operand']:
|
|
if 'operandtwo_singlevalued' in working_predicate:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Operand two is single-valued, value %s has already be given') \
|
|
%working_predicate['working_operand']['values_selected'][0])
|
|
return return_add_abac_permission_form(request)
|
|
if request.POST['value_operand'] in working_predicate['working_operand']['values_selected']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Value already added'))
|
|
return return_add_abac_permission_form(request)
|
|
else:
|
|
working_predicate['working_operand']['values_selected']= []
|
|
request.session.pop('working_predicate')
|
|
working_predicate['working_operand']['values_selected'].append(request.POST['value_operand'])
|
|
request.session['working_predicate'] = working_predicate
|
|
except:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Value not found'))
|
|
return return_add_abac_permission_form(request)
|
|
|
|
if 'close_working_operand' in request.POST:
|
|
if not 'working_predicate' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No working predicate'))
|
|
elif not 'type' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing predicate type'))
|
|
elif request.session['working_predicate']['type'] == \
|
|
PREDICATE_REQUIRED:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Working predicate is not a required attribute'))
|
|
elif 'operand1_defined' in request.session['working_predicate'] \
|
|
and 'operand2_defined' in request.session['working_predicate']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Both operands already defined'))
|
|
elif not 'working_operand' in request.session['working_predicate'] \
|
|
or not 'type' in request.session['working_predicate']['working_operand']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('There is no valid operand definition in progress'))
|
|
elif not 'definition_name' in request.session['working_predicate']['working_operand']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing definition of the working operand'))
|
|
elif not 'values_selected' in request.session['working_predicate']['working_operand'] \
|
|
and not 'sources_selected' in request.session['working_predicate']['working_operand']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing values or sources for the working operand'))
|
|
else:
|
|
working_predicate = request.session['working_predicate']
|
|
request.session.pop('working_predicate')
|
|
if 'operand1_defined' in working_predicate:
|
|
working_predicate['operand2_defined'] = working_predicate['working_operand']
|
|
else:
|
|
working_predicate['operand1_defined'] = working_predicate['working_operand']
|
|
working_predicate.pop('working_operand')
|
|
request.session['working_predicate'] = working_predicate
|
|
|
|
return return_add_abac_permission_form(request)
|
|
|
|
'''
|
|
Rule definition
|
|
'''
|
|
if 'set_rule' in request.POST and \
|
|
'rule_string' in request.POST:
|
|
if not 'predicates' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No predicates to define the rule'))
|
|
else:
|
|
'''
|
|
Check that the expression only contains predicate ids and
|
|
characters '(', ')', '&', '|', '-'
|
|
'''
|
|
check = request.POST['rule_string']
|
|
p_id = 1
|
|
for predicate in request.session['predicates']:
|
|
check = re.sub(str(p_id), '', check)
|
|
p_id = p_id + 1
|
|
for it in [' ', '\)', '\(', '&', '\|', '-']:
|
|
check = re.sub(it, '', check)
|
|
if check:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('The logical expression contains unknown \
|
|
predicates or unauthorized characters (%s)' % check))
|
|
logger.debug('add_abac_permission: check failure: %s' \
|
|
% check)
|
|
elif not is_proposition(request.POST['rule_string']):
|
|
'''
|
|
Check that the logical expression is well-formed
|
|
'''
|
|
messages.add_message(request, messages.ERROR,
|
|
_('The logical expression is malformed'))
|
|
else:
|
|
if 'rule' in request.session:
|
|
request.session.pop('rule')
|
|
messages.add_message(request, messages.INFO,
|
|
_('The logical expression is well-formed'))
|
|
request.session['rule'] = request.POST['rule_string']
|
|
return return_add_abac_permission_form(request)
|
|
|
|
'''
|
|
Permission definition
|
|
'''
|
|
if 'add_permission' in request.POST:
|
|
if not 'predicates' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Mising predicates'))
|
|
elif not 'rule' in request.session:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Missing rule'))
|
|
elif not 'what_matches' in request.POST \
|
|
or not request.POST['what_matches']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No object for What provided'))
|
|
return return_add_abac_permission_form(request)
|
|
elif not 'how_matches' in request.POST \
|
|
or not request.POST['how_matches']:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('No object for How provided'))
|
|
return return_add_abac_permission_form(request)
|
|
who = None
|
|
if 'who_matches' in request.POST \
|
|
and request.POST['who_matches'] != '_':
|
|
try:
|
|
who = get_who_from_one_post_field(request, 'who_matches')
|
|
except Exception, err:
|
|
logger.error('add_permission: \
|
|
Fail to find who due to %s' % err)
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Fail to find who due to %s') % err)
|
|
return return_add_abac_permission_form(request)
|
|
if not isinstance(who, UserAlias):
|
|
logger.error('add_permission: \
|
|
who should only be a user')
|
|
messages.add_message(request, messages.ERROR,
|
|
_("Who should only be a user or 'Anybody'"))
|
|
return return_add_abac_permission_form(request)
|
|
try:
|
|
what = get_what_from_one_post_field(request, 'what_matches')
|
|
how = get_how_from_one_post_field(request, 'how_matches')
|
|
except Exception, err:
|
|
logger.error('add_permission: \
|
|
Fail to find an object due to %s' % err)
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Fail to find an object due to %s') % err)
|
|
return return_add_abac_permission_form(request)
|
|
if not check_object_or_view(request, what) \
|
|
or not check_action_or_activity(request, how):
|
|
return return_add_abac_permission_form(request)
|
|
|
|
p, message = \
|
|
check_data_and_create_permission(request, who, what, how)
|
|
|
|
if not p:
|
|
messages.add_message(request, messages.ERROR,
|
|
message)
|
|
return return_add_abac_permission_form(request)
|
|
else:
|
|
if 'working_predicate' in request.session:
|
|
request.session.pop('working_predicate')
|
|
if 'predicates' in request.session:
|
|
request.session.pop('predicates')
|
|
if 'rule' in request.session:
|
|
request.session.pop('rule')
|
|
message += str(p)
|
|
messages.add_message(request, messages.INFO,
|
|
message)
|
|
return HttpResponseRedirect('add_permission_any')
|
|
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Unknown action'))
|
|
|
|
return return_add_abac_permission_form(request)
|
|
|
|
|
|
@transaction.commit_manually
|
|
def check_data_and_create_permission(request, who, what, how):
|
|
p = None
|
|
try:
|
|
rule = None
|
|
try:
|
|
rule = AbacRule()
|
|
rule.save()
|
|
except:
|
|
raise Exception('Unable to initialize rule')
|
|
p_ids1 = {}
|
|
p_ids2 = {}
|
|
p_id = 1
|
|
for predicate in request.session['predicates']:
|
|
if not 'type' in predicate:
|
|
raise Exception('Missing type of predicate %s' %str(p_id))
|
|
pred = None
|
|
if predicate['type'] == PREDICATE_REQUIRED:
|
|
if not 'definition_name' in predicate:
|
|
raise Exception('Missing definition of predicate %s' \
|
|
% str(p_id))
|
|
if not 'sources_selected' in predicate:
|
|
raise Exception('Missing sources for predicate %s' \
|
|
% str(p_id))
|
|
ad = None
|
|
try:
|
|
ad = AssertionDefinition(\
|
|
definition=predicate['definition_name'])
|
|
ad.save()
|
|
except:
|
|
raise Exception(\
|
|
'Unable to create assertion for predicate %s' \
|
|
% str(p_id))
|
|
for s_id, s_name in predicate['sources_selected']:
|
|
source = None
|
|
try:
|
|
source = AttributeSource.objects.get(id=s_id)
|
|
except:
|
|
raise Exception(\
|
|
'Unable to find source (%s, %s) for predicate %s' \
|
|
% (s_id, s_name, str(p_id)))
|
|
try:
|
|
ad.add_source(source)
|
|
except:
|
|
raise Exception(\
|
|
'Unable to add source %s to predicate %s' \
|
|
% (source, str(p_id)))
|
|
single_value = False
|
|
if 'singlevalued' in predicate:
|
|
single_value = True
|
|
pred = PredicateRequired(assertion_definition=ad,
|
|
single_value=single_value, rule=rule)
|
|
elif predicate['type'] == PREDICATE_ROLE:
|
|
if not 'role' in predicate:
|
|
raise Exception('Missing role of predicate %s' %str(p_id))
|
|
pred = PredicateRole(role=predicate['role'], rule=rule)
|
|
else:
|
|
if not 'multivalues_step_two' in predicate \
|
|
or not 'multivalues_explanation' in predicate \
|
|
or not predicate['multivalues_step_two'] in \
|
|
('NO_MULTIVALUES',
|
|
'EQUAL_ONE_VALUE',
|
|
'EQUAL_OP1_SUBSET_OP2',
|
|
'EQUAL_EXACT_MATCH',
|
|
'DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2',
|
|
'DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2',
|
|
'DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2',
|
|
'DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2'):
|
|
raise Exception(\
|
|
'Missing suitable option for multivalues treatment')
|
|
if 'operandone_singlevalued' in predicate \
|
|
and 'operandtwo_singlevalued' in predicate \
|
|
and predicate['multivalues_step_two'] \
|
|
!= 'NO_MULTIVALUES':
|
|
raise Exception(\
|
|
'Unacceptable option for multivalues treatment')
|
|
|
|
if not 'operand1_defined' in predicate:
|
|
raise Exception('Missing operand1 for predicate %s' \
|
|
% str(p_id))
|
|
if not 'type' in predicate['operand1_defined']:
|
|
raise Exception('Missing type of operand1 for predicate \
|
|
%s' % str(p_id))
|
|
if not 'definition_name' in predicate['operand1_defined']:
|
|
raise Exception('Missing definition of operand1 of \
|
|
predicate %s' %str(p_id))
|
|
if not 'operand2_defined' in predicate:
|
|
raise Exception('Missing operand2 for predicate %s' \
|
|
% str(p_id))
|
|
if not 'type' in predicate['operand2_defined']:
|
|
raise Exception('Missing type of operand2 for predicate \
|
|
%s' % str(p_id))
|
|
if not 'definition_name' in predicate['operand2_defined']:
|
|
raise Exception('Missing definition of operand2 of \
|
|
predicate %s' % str(p_id))
|
|
|
|
d1 = predicate['operand1_defined']['definition_name']
|
|
d2 = predicate['operand2_defined']['definition_name']
|
|
if get_attribute_type_of_definition(d1) \
|
|
!= get_attribute_type_of_definition(d2):
|
|
raise Exception('Data types of the two operands of \
|
|
predicate %s differ' %str(p_id))
|
|
|
|
a1 = handle_operand(predicate, p_id, 'operand1', d1)
|
|
a2 = handle_operand(predicate, p_id, 'operand2', d2)
|
|
|
|
operand1_single_value = False
|
|
if 'operandone_singlevalued' in predicate:
|
|
operand1_single_value = True
|
|
operand2_single_value = False
|
|
if 'operandtwo_singlevalued' in predicate:
|
|
operand2_single_value = True
|
|
|
|
pred = PredicateComparison(operand1=a1, operand2=a2,
|
|
operand1_single_value=operand1_single_value,
|
|
operand2_single_value=operand2_single_value,
|
|
comparison_type=predicate['type'],
|
|
multivalues=predicate['multivalues_step_two'],
|
|
multivalues_explanation=predicate['multivalues_explanation'],
|
|
rule=rule)
|
|
|
|
pred.save()
|
|
|
|
'''
|
|
Here we substitute friendly predicate identifiers (displayed on
|
|
the GUI) with their primary key.
|
|
|
|
The substitution might fail if some primary keys are equal to
|
|
friendly identifiers since with an iterative substitution, a
|
|
primary key resulting from a previsous substitution would be
|
|
replaced by another primary key.
|
|
|
|
To prevent this we realize a two-round substitution with unique
|
|
identifiers.
|
|
'''
|
|
|
|
rdm_str = ''.join(random.choice(string.ascii_uppercase) for x in range(8))
|
|
p_ids1[p_id] = rdm_str
|
|
p_ids2[rdm_str] = pred.id
|
|
p_id = p_id + 1
|
|
|
|
if not p_ids1:
|
|
raise Exception('No predicate defined')
|
|
expression = request.session['rule']
|
|
|
|
'''
|
|
Check that the expression only contains predicate ids and
|
|
characters '(', ')', '&', '|', '-'
|
|
'''
|
|
check = expression
|
|
for key in p_ids1.keys():
|
|
check = re.sub(str(key), '', check)
|
|
for it in [' ', '\)', '\(', '&', '\|', '-']:
|
|
check = re.sub(it, '', check)
|
|
if check:
|
|
raise Exception('The logical expression contains unknown \
|
|
predicates or unauthorized characters (%s)' % check)
|
|
|
|
'''
|
|
Check that the logical expression is well-formed
|
|
'''
|
|
if not is_proposition(expression):
|
|
raise Exception('The logical expression is malformed')
|
|
|
|
for key in p_ids1.keys():
|
|
expression = re.sub(str(key), str(p_ids1[key]), expression)
|
|
for key in p_ids2.keys():
|
|
expression = re.sub(str(key), str(p_ids2[key]), expression)
|
|
|
|
rule.expression = expression
|
|
rule.save()
|
|
|
|
if who:
|
|
p = AcsAbacPermission(who=who, what=what, how=how, rule=rule)
|
|
else:
|
|
p = AcsAbacPermission(what=what, how=how, rule=rule)
|
|
if not p:
|
|
raise Exception('Fail to create permission')
|
|
try:
|
|
p.save()
|
|
except Exception, err:
|
|
raise Exception('Fail to save permission with error: %s' %err)
|
|
logger.info(\
|
|
'check_data_and_create_permission: Permission added: %s' %str(p))
|
|
|
|
except Exception, err:
|
|
logger.info(\
|
|
'check_data_and_create_permission: \
|
|
Rollback transaction because of %s.' % err)
|
|
transaction.rollback()
|
|
return (None, _('Failed to create permission due to %s') % str(err))
|
|
else:
|
|
logger.info(\
|
|
'check_data_and_create_permission: Commit transaction.')
|
|
transaction.commit()
|
|
return (p, _('Successful creation of permission'))
|
|
|
|
|
|
def handle_operand(predicate, p_id, name, d):
|
|
a = None
|
|
if predicate[name + '_defined']['type'] == "definition":
|
|
if not 'sources_selected' in predicate[name + '_defined']:
|
|
raise Exception('Missing sources of %s of predicate %s' \
|
|
% (name, str(p_id)))
|
|
try:
|
|
a = AssertionDefinition(definition=d)
|
|
a.save()
|
|
except:
|
|
raise Exception(\
|
|
'Unable to create assertion of %s of predicate %s' \
|
|
% (name, str(p_id)))
|
|
for s_id, s_name in predicate[name + '_defined']['sources_selected']:
|
|
source = None
|
|
try:
|
|
source = AttributeSource.objects.get(id=s_id)
|
|
except:
|
|
raise Exception(\
|
|
'Unable to find source (%s, %s) of %s of predicate %s' \
|
|
% (s_id, s_name, name, str(p_id)))
|
|
try:
|
|
a.add_source(source)
|
|
except:
|
|
raise Exception(\
|
|
'Unable to attach source %s of %s of predicate %s' \
|
|
% (source, name, str(p_id)))
|
|
else:
|
|
if not 'values_selected' in predicate[name + '_defined']:
|
|
raise Exception('Missing values of %s of predicate %s' \
|
|
% (name, str(p_id)))
|
|
for value in predicate[name + '_defined']['values_selected']:
|
|
if not convert_from_string(d, value):
|
|
raise Exception('The value %s cannot be converted into the \
|
|
type of the definition %s for predicate %s' \
|
|
% (value, d, str(p_id)))
|
|
data = AttributeData(definition=d,
|
|
values=predicate[name + '_defined']['values_selected'])
|
|
try:
|
|
a = AssertionData()
|
|
a.save()
|
|
a.set_attribute_data(data)
|
|
except:
|
|
raise Exception(\
|
|
'Unable to create assertion data of predicate %s' \
|
|
% str(p_id))
|
|
return a
|
|
|
|
|
|
@check_policy_in_session
|
|
@check_authorized_for_abac
|
|
def return_add_abac_permission_form(request, template_name='add_abac_permission.html'):
|
|
tpl_p = {}
|
|
policy = get_policy_from_session(request)
|
|
|
|
tpl_p['multivalues'] = []
|
|
|
|
if 'working_predicate' in request.session \
|
|
and 'type' in request.session['working_predicate'] \
|
|
and (request.session['working_predicate']['type'] \
|
|
in XACML_COMPARISON_EQUALITY \
|
|
or request.session['working_predicate']['type'] \
|
|
in ACS_XACML_COMPARISON):
|
|
|
|
if 'multivalues_step_one' in request.session['working_predicate'] \
|
|
and request.session['working_predicate']['type'] \
|
|
in XACML_COMPARISON_EQUALITY \
|
|
and (not 'operandtwo_singlevalued' in request.session['working_predicate'] \
|
|
and not 'operandtwo_singlevalued' in request.session['working_predicate']) \
|
|
and not 'multivalues_step_two' in request.session['working_predicate']:
|
|
tpl_p['multivalues'].append(('EQUAL_ONE_VALUE', _('The first attribute may have multiple values and at least one must be equal to a value of the second attribute')))
|
|
tpl_p['multivalues'].append(('EQUAL_OP1_SUBSET_OP2', _('The first attribute may have multiple values and each must be equal to a value of the second attribute (subset)')))
|
|
tpl_p['multivalues'].append(('EQUAL_EXACT_MATCH', _('The first attribute may have multiple values that exactly match the values of the second attribute')))
|
|
|
|
elif 'multivalues_step_one' in request.session['working_predicate'] \
|
|
and request.session['working_predicate']['type'] \
|
|
in ACS_XACML_COMPARISON \
|
|
and not 'multivalues_step_two' in request.session['working_predicate']:
|
|
s = None
|
|
if request.session['working_predicate']['type'] in ACS_XACML_COMPARISON_LT:
|
|
s = 'less than'
|
|
elif request.session['working_predicate']['type'] in ACS_XACML_COMPARISON_LT_OE:
|
|
s = 'less than or equal to'
|
|
elif request.session['working_predicate']['type'] in ACS_XACML_COMPARISON_GRT:
|
|
s = 'greater than'
|
|
elif request.session['working_predicate']['type'] in ACS_XACML_COMPARISON_GRT_OE:
|
|
s = 'greater than or equal to'
|
|
tpl_p['multivalues'].append(('DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2', _('All values of operand one must be %s the highest value of operand two') %s))
|
|
tpl_p['multivalues'].append(('DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2', _('All values of operand one must be %s the smallest value of operand two') %s))
|
|
tpl_p['multivalues'].append(('DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2', _('At least one value of operand one must be %s the highest value of operand two') %s))
|
|
tpl_p['multivalues'].append(('DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2', _('At least one value of operand one must be %s the smallest value of operand two') %s))
|
|
|
|
if 'working_predicate' in request.session:
|
|
tpl_p['working_predicate'] = request.session['working_predicate']
|
|
if request.session['working_predicate']['type'] == \
|
|
PREDICATE_REQUIRED:
|
|
tpl_p['attribute_definitions'] = get_all_attribute_definitions()
|
|
elif request.session['working_predicate']['type'] == \
|
|
PREDICATE_ROLE:
|
|
if is_policy_user_administrator(request.user, policy):
|
|
tpl_p['roles'] = Role.objects.filter(namespace=policy.namespace)
|
|
else:
|
|
tpl_p['roles'] = \
|
|
return_list_roles_authorized_for_admin(
|
|
set_default_alias(request.user))
|
|
tpl_p['roles'] = \
|
|
filter_list_in_namespace(tpl_p['roles'], policy.namespace)
|
|
else:
|
|
tpl_p['attribute_definitions'] = [definition \
|
|
for definition in get_all_attribute_definitions() \
|
|
if get_attribute_type_of_definition(definition) \
|
|
== ACS_COMP_TYPE[\
|
|
request.session['working_predicate']['type']]]
|
|
tpl_p['sources'] = get_all_sources()
|
|
|
|
else:
|
|
tpl_p['predicate_types'] = PREDICATE_TYPES_TYPE
|
|
if 'predicates' in request.session:
|
|
tpl_p['predicates'] = []
|
|
i = 1
|
|
for p in request.session['predicates']:
|
|
tpl_p['predicates'].append((i, p))
|
|
i = i + 1
|
|
|
|
if 'rule' in request.session:
|
|
tpl_p['rule'] = request.session['rule']
|
|
|
|
if is_policy_user_administrator(request.user, policy):
|
|
tpl_p['who_to_display'] = \
|
|
UserAlias.objects.filter(namespace=policy.namespace)
|
|
else:
|
|
tpl_p['who_to_display'] = \
|
|
filter_list_in_namespace(
|
|
return_list_users_authorized_for_admin(
|
|
set_default_alias(request.user)),
|
|
policy.namespace)
|
|
|
|
tpl_p['what_to_display'] = \
|
|
return_list_objects_authorized_for_admin(
|
|
set_default_alias(request.user)) + \
|
|
return_list_views_authorized_for_admin(
|
|
set_default_alias(request.user))
|
|
tpl_p['what_to_display'] = \
|
|
filter_list_in_namespace(tpl_p['what_to_display'], policy.namespace)
|
|
if is_policy_object_creator(request.user, policy):
|
|
for a in AcsObject.objects.filter(namespace=policy.namespace):
|
|
if not a in tpl_p['what_to_display']:
|
|
tpl_p['what_to_display'].append(a)
|
|
for a in View.objects.filter(namespace=policy.namespace):
|
|
if not a in tpl_p['what_to_display']:
|
|
tpl_p['what_to_display'].append(a)
|
|
|
|
tpl_p['how_to_display'] = \
|
|
return_list_actions_authorized_for_admin(
|
|
set_default_alias(request.user)) + \
|
|
return_list_activities_authorized_for_admin(
|
|
set_default_alias(request.user))
|
|
tpl_p['how_to_display'] = \
|
|
filter_list_in_namespace(tpl_p['how_to_display'], policy.namespace)
|
|
if is_policy_action_creator(request.user, policy):
|
|
for a in Action.objects.filter(namespace=policy.namespace):
|
|
if not a in tpl_p['how_to_display']:
|
|
tpl_p['how_to_display'].append(a)
|
|
for a in Activity.objects.filter(namespace=policy.namespace):
|
|
if not a in tpl_p['how_to_display']:
|
|
tpl_p['how_to_display'].append(a)
|
|
|
|
tpl_p['title'] = _('Add a permission in %s') %policy
|
|
tpl_p['backlink'] = 'mod_policy?id=' + str(policy.id)
|
|
return render_to_response(template_name, tpl_p,
|
|
context_instance=RequestContext(request))
|
|
|
|
|
|
@csrf_exempt
|
|
@check_policy_in_session
|
|
@check_authorized_for_abac
|
|
def list_abac_permissions(request):
|
|
'''
|
|
Permissions can only be set on objects all being in the same policy
|
|
'''
|
|
policy = get_policy_from_session(request)
|
|
permissions = AcsAbacPermission.objects.all()
|
|
list_any = []
|
|
for p in permissions:
|
|
if check_abac_permission(request.user, policy, p):
|
|
list_any.append(p)
|
|
title = _('Delete an ABAC permission')
|
|
backlink = 'mod_policy?id=' + str(policy.id)
|
|
|
|
tpl_parameters = {'list_any': list_any,
|
|
'title': title,
|
|
'back_url': 'list_abac_permissions',
|
|
'backlink': backlink,
|
|
'policy': policy}
|
|
return render_to_response('list_abac_permissions.html', tpl_parameters,
|
|
context_instance=RequestContext(request))
|
|
|
|
|
|
@csrf_exempt
|
|
@check_policy_in_session
|
|
@check_authorized_for_abac
|
|
def del_abac_permission(request):
|
|
policy = get_policy_from_session(request)
|
|
back = '/list_abac_permissions'
|
|
if request.method == 'POST':
|
|
p = None
|
|
if 'back_url' in request.POST and request.POST['back_url']:
|
|
back = request.POST['back_url']
|
|
if 'Cancel' in request.POST:
|
|
messages.add_message(request, messages.INFO,
|
|
_('Operation canceled'))
|
|
return HttpResponseRedirect(back)
|
|
if 'id' in request.POST and request.POST['id']:
|
|
try:
|
|
p = AcsAbacPermission.objects.get(id=request.POST['id'])
|
|
except:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Unknown permission'))
|
|
return HttpResponseRedirect(back)
|
|
else:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Unknown permission'))
|
|
return HttpResponseRedirect(back)
|
|
|
|
if not check_abac_permission(request.user, policy, p):
|
|
messages.add_message(request, messages.ERROR,
|
|
_('You have not enough rights to perform this deletion'))
|
|
return HttpResponseRedirect(back)
|
|
|
|
if not 'consent' in request.POST:
|
|
tpl_parameters = {'permission': p, 'back_url': back}
|
|
template_name = 'deletion_abac_permission_confirmation.html'
|
|
return render_to_response(template_name, tpl_parameters,
|
|
context_instance=RequestContext(request))
|
|
|
|
logger.info('del_abac_permission: deletion of %s' % p)
|
|
try:
|
|
remove_rule(p.rule)
|
|
p.delete()
|
|
logger.info('del_abac_permission: permission deleted')
|
|
messages.add_message(request, messages.INFO,
|
|
_('Permission deleted'))
|
|
except Exception, err:
|
|
messages.add_message(request, messages.ERROR,
|
|
_('Unable to delete permission due to %s') %str(err))
|
|
return HttpResponseRedirect(back)
|