839 lines
34 KiB
Python
839 lines
34 KiB
Python
'''
|
|
VERIDIC - Towards a centralized access control system
|
|
|
|
Copyright (C) 2011 Mikael Ates
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as
|
|
published by the Free Software Foundation, either version 3 of the
|
|
License, or (at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
'''
|
|
|
|
import re
|
|
import logging
|
|
|
|
from django.db import transaction
|
|
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
|
|
|
|
from acs.abac.models import *
|
|
|
|
from acs.xacml.constants import *
|
|
|
|
from acs.core import get_alias_in_policy_from_namespace, \
|
|
stack_of_roles_from_user
|
|
|
|
logger = logging.getLogger('abac')
|
|
|
|
|
|
def extract_predicate_ids(expression):
|
|
if not expression:
|
|
return []
|
|
ids = []
|
|
res = re.search(r'\d+', expression)
|
|
while res and res.group(0):
|
|
ids.append(res.group(0))
|
|
p = r'(%s)' % res.group(0)
|
|
expression = re.sub(p, '', expression)
|
|
res = re.search(r'\d+', expression)
|
|
return ids
|
|
|
|
|
|
def check_predicate(predicate, profile):
|
|
if isinstance(predicate, PredicateRequired):
|
|
logger.debug("check_predicate: PredicateRequired %s" % predicate)
|
|
return check_predicate_required(predicate, profile)
|
|
if isinstance(predicate, PredicateComparison):
|
|
logger.debug("check_predicate: PredicateComparison %s" % predicate)
|
|
return check_predicate_comparison(predicate, profile)
|
|
if isinstance(predicate, PredicateRole):
|
|
logger.debug("check_predicate: PredicateRole %s" % predicate)
|
|
return check_predicate_role(predicate, profile)
|
|
return False
|
|
|
|
|
|
def check_predicate_comparison(predicate, profile):
|
|
'''
|
|
Grab definition of the predicate
|
|
Look in profile for a corresponding AssertionData
|
|
Then compare values
|
|
|
|
An AssertionDefinition may indicate multiple sources
|
|
Then multiple AssertionData may correspond. It means that any of them
|
|
may satisfy the predicate
|
|
|
|
Here, we look for every assertionData in the profile
|
|
for a definition and a source
|
|
We accept provided by multiple source or even multiple from the same
|
|
source
|
|
One of them is enough to satisfy the comparison
|
|
|
|
See function below to see how multi valued attributes are treated
|
|
'''
|
|
#operand1
|
|
l_data1 = []
|
|
data1 = predicate.operand1.get_assertion_instance()
|
|
if isinstance(data1, AssertionDefinition):
|
|
sources = AttachedSource.objects.filter(assertion=data1)
|
|
if not sources:
|
|
return False
|
|
definition = data1.attribute_definition
|
|
for source in sources:
|
|
ads = AssertionData.objects.filter(profile=profile, source=source.source)
|
|
for ad in ads:
|
|
if ad.attribute_data.definition.id == definition.id:
|
|
l_data1.append(ad.attribute_data)
|
|
if not l_data1:
|
|
return False
|
|
else:
|
|
l_data1.append(data1.attribute_data)
|
|
#operand2
|
|
l_data2 = []
|
|
data2 = predicate.operand2.get_assertion_instance()
|
|
if isinstance(data2, AssertionDefinition):
|
|
sources = AttachedSource.objects.filter(assertion=data2)
|
|
if not sources:
|
|
return False
|
|
definition = data2.attribute_definition
|
|
for source in sources:
|
|
ads = AssertionData.objects.filter(profile=profile, source=source.source)
|
|
for ad in ads:
|
|
if ad.attribute_data.definition.id == definition.id:
|
|
l_data2.append(ad.attribute_data)
|
|
if not l_data2:
|
|
return False
|
|
else:
|
|
l_data2.append(data2.attribute_data)
|
|
for d1 in l_data1:
|
|
for d2 in l_data2:
|
|
if compare_two_data(predicate, d1, d2):
|
|
return True
|
|
return False
|
|
|
|
|
|
'''
|
|
A predicate contains an option indicating if an attribute must be single valued.
|
|
Since considered has an enforcement, a check includes the test that attribute has a single value if required,
|
|
else that a multivalue option not NO_MULTIVALUES must be set.
|
|
|
|
All:
|
|
NO_MULTIVALUES
|
|
Both operand are single valued attributes
|
|
|
|
Equality:
|
|
EQUAL_ONE_VALUE
|
|
At least one value of the values of OP1 is equal to one value of the values of OP2
|
|
EQUAL_OP1_SUBSET_OP2
|
|
The values of OP1 is is a subset of the values of OP2
|
|
EQUAL_EXACT_MATCH
|
|
Equal set of values
|
|
|
|
Diff strict:
|
|
DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2
|
|
ACS_XACML_COMPARISON_INTEGER_LT
|
|
All values of OP1 must be less than the highest value of OP2
|
|
ACS_XACML_COMPARISON_INTEGER_GRT
|
|
All values of OP1 must be greater than the highest value of OP2
|
|
DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2
|
|
ACS_XACML_COMPARISON_INTEGER_LT
|
|
All values of OP1 must be less than the smallest value of OP2
|
|
ACS_XACML_COMPARISON_INTEGER_GRT
|
|
All values of OP1 must be greater than the smallest value of OP2
|
|
DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2
|
|
ACS_XACML_COMPARISON_INTEGER_LT
|
|
At least one value of OP1 must be less than the highest value of OP2
|
|
ACS_XACML_COMPARISON_INTEGER_GRT
|
|
At least one value of OP1 must be greater than the highest value of OP2
|
|
DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2
|
|
ACS_XACML_COMPARISON_INTEGER_LT
|
|
At least one value of OP1 must be less than the smallest value of OP2
|
|
ACS_XACML_COMPARISON_INTEGER_GRT
|
|
At least one value of OP1 must be greater than the smallest value of OP2
|
|
|
|
Diff or equal:
|
|
Same as for strict and equality is treated as follows:
|
|
DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2
|
|
ACS_XACML_COMPARISON_INTEGER_LT_OE
|
|
All values of OP1 must be less than or equal to the highest value of OP2
|
|
ACS_XACML_COMPARISON_INTEGER_GRT_OE
|
|
All values of OP1 must be greater than or equal to the highest value of OP2
|
|
DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2
|
|
ACS_XACML_COMPARISON_INTEGER_LT_OE
|
|
All values of OP1 must be less than or equal to the smallest value of OP2
|
|
ACS_XACML_COMPARISON_INTEGER_GRT_OE
|
|
All values of OP1 must be greater than or equal to the smallest value of OP2
|
|
DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2
|
|
ACS_XACML_COMPARISON_INTEGER_LT_OE
|
|
At least one value of OP1 must be less than or equal to the highest value of OP2
|
|
ACS_XACML_COMPARISON_INTEGER_GRT_OE
|
|
At least one value of OP1 must be greater than or equal to the highest value of OP2
|
|
DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2
|
|
ACS_XACML_COMPARISON_INTEGER_LT_OE
|
|
At least one value of OP1 must be less than or equal to the smallest value of OP2
|
|
ACS_XACML_COMPARISON_INTEGER_GRT_OE
|
|
At least one value of OP1 must be greater than or equal to the smallest value of OP2
|
|
|
|
To deal with richer comparison and equality of multivalued attributes, a 'or' statement should be used
|
|
'''
|
|
|
|
|
|
def compare_two_data(predicate, data1, data2):
|
|
if data1.definition.id != data2.definition.id:
|
|
return False
|
|
if data1.definition.attribute_type == ACS_XACML_DATATYPE_STRING:
|
|
data1_values = StringM.objects.filter(data=data1)
|
|
data2_values = StringM.objects.filter(data=data2)
|
|
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_BOOLEAN:
|
|
data1_values = BooleanM.objects.filter(data=data1)
|
|
data2_values = BooleanM.objects.filter(data=data2)
|
|
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_INTEGER:
|
|
data1_values = IntegerM.objects.filter(data=data1)
|
|
data2_values = IntegerM.objects.filter(data=data2)
|
|
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_DOUBLE:
|
|
data1_values = DoubleM.objects.filter(data=data1)
|
|
data2_values = DoubleM.objects.filter(data=data2)
|
|
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_TIME:
|
|
data1_values = TimeM.objects.filter(data=data1)
|
|
data2_values = TimeM.objects.filter(data=data2)
|
|
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_DATE:
|
|
data1_values = DateM.objects.filter(data=data1)
|
|
data2_values = DateM.objects.filter(data=data2)
|
|
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_DATETIME:
|
|
data1_values = DateTimeM.objects.filter(data=data1)
|
|
data2_values = DateTimeM.objects.filter(data=data2)
|
|
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_RFC822NAME:
|
|
data1_values = Rfc822NameM.objects.filter(data=data1)
|
|
data2_values = Rfc822NameM.objects.filter(data=data2)
|
|
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_IPADDRESS:
|
|
data1_values = IpAddressM.objects.filter(data=data1)
|
|
data2_values = IpAddressM.objects.filter(data=data2)
|
|
|
|
if not data1_values or not data2_values:
|
|
logger.debug("compare_two_data: \
|
|
Return False because no values was found for predicate %s" \
|
|
% predicate)
|
|
return False
|
|
|
|
if predicate.operand1_single_value and len(data1_values) > 1:
|
|
logger.debug("compare_two_data: \
|
|
Return False because a single value is required for operand one \
|
|
and multiple were found %s for predicate %s" \
|
|
% ([x.value for x in data1_values], predicate))
|
|
return False
|
|
if predicate.operand2_single_value and len(data2_values) > 1:
|
|
logger.debug("compare_two_data: \
|
|
Return False because a single value is required for operand two \
|
|
and multiple were found %s for predicate %s" \
|
|
% ([x.value for x in data2_values], predicate))
|
|
return False
|
|
|
|
if not(predicate.operand1_single_value \
|
|
and predicate.operand2_single_value) \
|
|
and ((predicate.comparison_type in XACML_COMPARISON_EQUALITY \
|
|
and not predicate.multivalues in \
|
|
('EQUAL_ONE_VALUE',
|
|
'EQUAL_OP1_SUBSET_OP2',
|
|
'EQUAL_EXACT_MATCH')) \
|
|
or (predicate.comparison_type in ACS_XACML_COMPARISON \
|
|
and not predicate.multivalues in \
|
|
('DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2',
|
|
'DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2',
|
|
'DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2',
|
|
'DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2'))):
|
|
logger.debug("compare_two_data: \
|
|
Return False because multivalued attributes are accepted and no \
|
|
suitable management option has been selected for predicate %s" \
|
|
% str(predicate))
|
|
return False
|
|
|
|
logger.debug("compare_two_data: \
|
|
Evaluation of predicate %s" \
|
|
% str(predicate))
|
|
if predicate.comparison_type in XACML_COMPARISON_EQUALITY:
|
|
return test_equality_of_values(data1_values, data2_values,
|
|
data1.definition.attribute_type,
|
|
predicate.comparison_type,
|
|
predicate.multivalues)
|
|
|
|
elif predicate.comparison_type in ACS_XACML_COMPARISON:
|
|
return test_diff_of_multivalues(data1_values, data2_values,
|
|
data1.definition.attribute_type,
|
|
predicate.comparison_type,
|
|
predicate.multivalues)
|
|
|
|
logger.debug("compare_two_data: \
|
|
Return False because of unknown comparison type \
|
|
for predicate %s" \
|
|
% predicate)
|
|
return False
|
|
|
|
|
|
def test_diff_of_multivalues(data1_values, data2_values, data_type, comparison_type,
|
|
test_type='NO_MULTIVALUES'):
|
|
if test_type == 'NO_MULTIVALUES':
|
|
logger.debug("test_diff_of_multivalues: \
|
|
No multivalues, test %s and %s" \
|
|
% (data1_values[0].value, data2_values[0].value))
|
|
if test_diff_two_values(data1_values[0].value, data2_values[0].value, comparison_type):
|
|
logger.debug("test_diff_of_multivalues: True")
|
|
return True
|
|
else:
|
|
logger.debug("test_diff_of_multivalues: False")
|
|
return False
|
|
|
|
# BOTTOM LIMIT with LT and UPPER LIMIT with GTR
|
|
# OP1a <= OP1b <= OP2a <= OP2b
|
|
# OP1b >= OP1a >= OP2b >= OP2a
|
|
# With ALL and LT, no value of OP1 must be superior to the bottom limit of OP2
|
|
# With ALL and GRT, no value of OP1 must be less than the upper limit of OP2
|
|
elif (test_type == 'DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2' \
|
|
and comparison_type in ACS_XACML_COMPARISON_LT + \
|
|
ACS_XACML_COMPARISON_LT_OE) \
|
|
or (test_type == 'DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2' \
|
|
and comparison_type in ACS_XACML_COMPARISON_GRT + \
|
|
ACS_XACML_COMPARISON_GRT_OE):
|
|
for v1 in data1_values:
|
|
for v2 in data2_values:
|
|
if not test_diff_two_values(v1.value, v2.value, comparison_type):
|
|
logger.debug("test_diff_of_multivalues: \
|
|
DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2 (LT) or \
|
|
DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2 (GRT) - \
|
|
predicate not satisfied")
|
|
return False
|
|
return True
|
|
|
|
# UPPER LIMIT with LT and BOTTOM LIMIT with GTR
|
|
# OP2a <= OP1a <= OP1b <= OP2b
|
|
# OP2b >= OP1b >= OP2a >= OP2a
|
|
# With ALL and LT, no value of OP1 must be superior to the upper limit of OP2
|
|
# With ALL and GRT, no value of OP1 must be less than the bottom limit of OP2
|
|
elif (test_type == 'DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2' \
|
|
and comparison_type in ACS_XACML_COMPARISON_GRT + \
|
|
ACS_XACML_COMPARISON_GRT_OE) \
|
|
or (test_type == 'DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2' \
|
|
and comparison_type in ACS_XACML_COMPARISON_LT + \
|
|
ACS_XACML_COMPARISON_LT_OE):
|
|
for v2 in data2_values:
|
|
found = True
|
|
for v1 in data1_values:
|
|
if not test_diff_two_values(v1.value, v2.value, comparison_type):
|
|
found = False
|
|
if found:
|
|
return True
|
|
logger.debug("test_diff_of_multivalues: \
|
|
DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2 (GRT) or \
|
|
DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2 (LT) - \
|
|
predicate not satisfied")
|
|
return False
|
|
|
|
# BOTTOM LIMIT with LT and UPPER LIMIT with GTR
|
|
# OP1a <= OP2a <= OP1b <= OP2b
|
|
# OP1b >= OP2b >= OP2a >= OP2a
|
|
# With ONE and LT, some values of OP1 may be superior to the bottom limit of OP2
|
|
# With ONE and GRT, some values of OP1 must be less than the upper limit of OP2
|
|
elif (test_type == 'DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2' \
|
|
and comparison_type in ACS_XACML_COMPARISON_LT + \
|
|
ACS_XACML_COMPARISON_LT_OE) \
|
|
or (test_type == 'DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2' \
|
|
and comparison_type in ACS_XACML_COMPARISON_GRT + \
|
|
ACS_XACML_COMPARISON_GRT_OE):
|
|
for v1 in data1_values:
|
|
found = True
|
|
for v2 in data2_values:
|
|
if not test_diff_two_values(v1.value, v2.value, comparison_type):
|
|
found = False
|
|
if found:
|
|
return True
|
|
logger.debug("test_diff_of_multivalues: \
|
|
DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2 (LT) or \
|
|
DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2 (GRT) - \
|
|
predicate not satisfied")
|
|
return False
|
|
|
|
# UPPER LIMIT with LT and BOTTOM LIMIT with GTR
|
|
# OP1a <= OP2a <= OP1b <= OP2b
|
|
# OP1b >= OP2b >= OP2a >= OP2a
|
|
# With ONE and LT, some values of OP1 may be superior to the upper limit of OP2
|
|
# With ONE and GRT, some values of OP1 must be less than the bottom limit of OP2
|
|
elif (test_type == 'DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2' \
|
|
and comparison_type in ACS_XACML_COMPARISON_LT + \
|
|
ACS_XACML_COMPARISON_LT_OE) \
|
|
or (test_type == 'DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2' \
|
|
and comparison_type in ACS_XACML_COMPARISON_GRT + \
|
|
ACS_XACML_COMPARISON_GRT_OE):
|
|
for v1 in data1_values:
|
|
for v2 in data2_values:
|
|
if test_diff_two_values(v1.value, v2.value, comparison_type):
|
|
return True
|
|
logger.debug("test_diff_of_multivalues: \
|
|
DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2 (GRT) or \
|
|
DIFF_ONE_OP1_WITH_UPPER_LIMIT_OP2 (LT) - \
|
|
predicate not satisfied")
|
|
return False
|
|
|
|
logger.debug("test_diff_of_multivalues: \
|
|
Unknown multivalue option, \
|
|
predicate not satisfied")
|
|
return False
|
|
|
|
|
|
def test_diff_two_values(value1, value2, comparison_type):
|
|
if comparison_type in ACS_XACML_COMPARISON_GRT:
|
|
if value1 > value2:
|
|
return True
|
|
elif comparison_type in ACS_XACML_COMPARISON_GRT_OE:
|
|
if value1 >= value2:
|
|
return True
|
|
elif comparison_type in ACS_XACML_COMPARISON_LT:
|
|
if value1 < value2:
|
|
return True
|
|
elif comparison_type in ACS_XACML_COMPARISON_LT_OE:
|
|
if value1 <= value2:
|
|
return True
|
|
return False
|
|
|
|
|
|
def test_equality_of_values(data1_values, data2_values, data_type, comparison_type,
|
|
test_type='NO_MULTIVALUES'):
|
|
if test_type == 'NO_MULTIVALUES':
|
|
if data_type == ACS_XACML_DATATYPE_STRING \
|
|
and comparison_type == \
|
|
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE:
|
|
if data1_values[0].value.lower() == data2_values[0].value.lower():
|
|
return True
|
|
elif data1_values[0].value == data2_values[0].value:
|
|
return True
|
|
elif test_type == 'EQUAL_ONE_VALUE':
|
|
for v1 in data1_values:
|
|
for v2 in data2_values:
|
|
if data_type == ACS_XACML_DATATYPE_STRING \
|
|
and comparison_type == \
|
|
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE:
|
|
if v1.value.lower() == v2.value.lower():
|
|
return True
|
|
elif v1.value == v2.value:
|
|
return True
|
|
elif test_type == 'EQUAL_OP1_SUBSET_OP2':
|
|
for v1 in data1_values:
|
|
found = False
|
|
for v2 in data2_values:
|
|
if data_type == ACS_XACML_DATATYPE_STRING \
|
|
and comparison_type == \
|
|
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE:
|
|
if v1.value.lower() == v2.value.lower():
|
|
found = True
|
|
elif v1.value == v2.value:
|
|
found = True
|
|
if not found:
|
|
return False
|
|
return True
|
|
elif test_type == 'EQUAL_EXACT_MATCH':
|
|
if len(data1_values) != len(data2_values):
|
|
return False
|
|
for v1 in data1_values:
|
|
found = False
|
|
for v2 in data2_values:
|
|
if data_type == ACS_XACML_DATATYPE_STRING \
|
|
and comparison_type == \
|
|
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE:
|
|
if v1.value.lower() == v2.value.lower():
|
|
found = True
|
|
elif v1.value == v2.value:
|
|
found = True
|
|
if not found:
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
|
|
def check_predicate_required(predicate, profile):
|
|
'''
|
|
Look in profile all attributedata with that attribute type
|
|
Then check the source and return True on the first acceptable source
|
|
|
|
A single value attribute is checked per sources
|
|
Declaring multiple source is a OR between sources.
|
|
So the single-valued is checked per source.
|
|
If it is expected to check a unique value with multiple sources
|
|
it is required to use multiple required predicates and NOT operators
|
|
'''
|
|
logger.debug("check_predicate_required: check %s" % predicate)
|
|
definition = predicate.definition.attribute_definition
|
|
sources = AttachedSource.objects.filter(assertion=predicate.definition)
|
|
if not predicate.single_value:
|
|
logger.debug("check_predicate_required: single value not required")
|
|
for source in sources:
|
|
ads = AssertionData.objects.filter(profile=profile, source=source.source)
|
|
for ad in ads:
|
|
if ad.attribute_data.definition.id == definition.id:
|
|
logger.debug("check_predicate_required: satisfied")
|
|
return True
|
|
else:
|
|
logger.debug("check_predicate_required: single value required")
|
|
found = False
|
|
for source in sources:
|
|
ads = AssertionData.objects.filter(profile=profile, source=source.source)
|
|
c = 0
|
|
for ad in ads:
|
|
if ad.attribute_data.definition.id == definition.id:
|
|
c = c + 1
|
|
if c > 1:
|
|
logger.debug("check_predicate_required: multiple values found \
|
|
- not satisfied")
|
|
return False
|
|
elif c == 1:
|
|
found = True
|
|
if found:
|
|
return True
|
|
logger.debug("check_predicate_required: At least one source with a \
|
|
unique value found")
|
|
|
|
return False
|
|
|
|
|
|
def check_predicate_role(predicate, profile):
|
|
'''
|
|
Check that the user has the role or a senior role.
|
|
The user must be in the profile.
|
|
'''
|
|
if not predicate or not profile \
|
|
or not isinstance(predicate, PredicateRole) \
|
|
or not predicate.role \
|
|
or not profile.user:
|
|
return False
|
|
alias = get_alias_in_policy_from_namespace(profile.user, predicate.role.namespace)
|
|
if not alias:
|
|
logger.debug("check_predicate_role: no alias found for user: %s \
|
|
in namespace %s" % (profile.user, predicate.role.namespace))
|
|
return False
|
|
logger.debug("check_predicate_role: check if user %s has role %s" \
|
|
% (alias, predicate.role))
|
|
stack = stack_of_roles_from_user(alias)
|
|
logger.debug("check_predicate_role: roles of the user: %s" \
|
|
% stack)
|
|
if predicate.role in stack:
|
|
logger.debug("check_predicate_role: success")
|
|
return True
|
|
logger.debug("check_predicate_role: failure")
|
|
return False
|
|
|
|
|
|
def check_predicates(rule, profile):
|
|
'''
|
|
Parse rule and list predicates
|
|
Check presence, source, values...
|
|
'''
|
|
if not rule or not profile or not rule.expression:
|
|
return {}
|
|
predicates = []
|
|
dic = {}
|
|
for p in Predicate.objects.filter(rule=rule):
|
|
p_t = p.get_predicate_instance()
|
|
predicates.append(p_t)
|
|
dic[str(p.id)] = check_predicate(p_t, profile)
|
|
logger.debug("check_predicates: found %s" \
|
|
% [str(x) for x in predicates])
|
|
logger.debug("check_predicates: computed %s" % dic)
|
|
return dic
|
|
|
|
|
|
def get_source_form_name(name):
|
|
try:
|
|
return Source.objects.get(name=name)
|
|
except:
|
|
return None
|
|
|
|
|
|
def get_def_from_name_and_ns(name, namespace):
|
|
if not name or not namespace:
|
|
return None
|
|
try:
|
|
ans = AttributeName.objects.filter(name=name)
|
|
for an in ans:
|
|
if an.attribute_map.namespace.identifier == namespace:
|
|
return an.attribute_map.attribute_definition
|
|
return None
|
|
except:
|
|
return None
|
|
|
|
|
|
def add_assertion_to_profile(profile, source, definition, values):
|
|
'''
|
|
values is a list of strings
|
|
converted to the right data type according to the datatype given by the
|
|
definition
|
|
'''
|
|
data = None
|
|
if definition.attribute_type == ACS_XACML_DATATYPE_STRING:
|
|
data = AttributeData(definition=definition)
|
|
data.save()
|
|
for value in values:
|
|
StringM(data=data, value=value).save()
|
|
elif definition.attribute_type == ACS_XACML_DATATYPE_BOOLEAN:
|
|
return None
|
|
elif definition.attribute_type == ACS_XACML_DATATYPE_INTEGER:
|
|
data = AttributeData(definition=definition)
|
|
data.save()
|
|
for value in values:
|
|
v = int(value)
|
|
IntegerM(data=data, value=v).save()
|
|
elif definition.attribute_type == ACS_XACML_DATATYPE_DOUBLE:
|
|
return None
|
|
elif definition.attribute_type == ACS_XACML_DATATYPE_TIME:
|
|
return None
|
|
elif definition.attribute_type == ACS_XACML_DATATYPE_DATE:
|
|
return None
|
|
elif definition.attribute_type == ACS_XACML_DATATYPE_DATETIME:
|
|
return None
|
|
elif definition.attribute_type == ACS_XACML_DATATYPE_RFC822NAME:
|
|
return None
|
|
elif definition.attribute_type == ACS_XACML_DATATYPE_IPADDRESS:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
try:
|
|
a = AssertionData(profile=profile, source=source, attribute_data=data)
|
|
a.save()
|
|
return a
|
|
except:
|
|
return None
|
|
|
|
|
|
def arrange_missing_predicates(missing_predicates, profile):
|
|
'''
|
|
Here we have to retreat the priority of possibilities to satisfy
|
|
the rule.
|
|
missing_predicates present mutliples combinaisons possible to satisfy
|
|
the rule.
|
|
Output: we want to produce a rule with all the possibilities sorted by
|
|
the easiest to the hardest: rule = easiest_rule | ... | hardest_rule
|
|
This sort is based on the number of predicate not statisfied but also
|
|
on the following rules:
|
|
- First predicates required
|
|
- Comparison if predicate is False because an operande is missing
|
|
- Comparisons with only def
|
|
- comparison with data not satisfied
|
|
In practice, we prefer to ask the user to present its surname and its
|
|
firstname (2 predicate False up to now) rather to ask to present its
|
|
age if it has already been presented but did not satisfied a
|
|
comparison (only one predicate False but harder to satisfy)
|
|
|
|
TODO
|
|
'''
|
|
pass
|
|
|
|
|
|
def make_new_rule_from_missing_predicates(missing_predicates):
|
|
if not missing_predicates:
|
|
return None
|
|
s = ''
|
|
for it in missing_predicates:
|
|
for r in missing_predicates[it]:
|
|
'Or'
|
|
s += '('
|
|
s_tmp = ''
|
|
for p_id, now, exp in r:
|
|
'And'
|
|
'If True to False put negative sign -'
|
|
if now:
|
|
s_tmp += '(-'
|
|
s_tmp += p_id
|
|
if now:
|
|
s_tmp += ')'
|
|
s_tmp += '&'
|
|
s += s_tmp[:-1]
|
|
s += ')'
|
|
if r:
|
|
s += '|'
|
|
if s:
|
|
return s[:-1]
|
|
else:
|
|
return None
|
|
|
|
|
|
def get_namespace_by_friendly_name(name):
|
|
if not name:
|
|
return None
|
|
try:
|
|
return AttributeNamespace.objects.get(friendly_name=name)
|
|
except ObjectDoesNotExist:
|
|
return None
|
|
except MultipleObjectsReturned:
|
|
print '###ERROR: multiple namespace with the friendly name %s' %ns
|
|
|
|
|
|
def get_namespace_by_identifier(identifier):
|
|
if not identifier:
|
|
return None
|
|
try:
|
|
return AttributeNamespace.objects.get(identifier=identifier)
|
|
except ObjectDoesNotExist:
|
|
return None
|
|
except MultipleObjectsReturned:
|
|
print '###ERROR: multiple namespace with the identifier name %s' %identifier
|
|
|
|
|
|
def get_attribute_definition_by_name(name):
|
|
if not name:
|
|
return None
|
|
try:
|
|
return AttributeDefinition.objects.get(attribute_name=name)
|
|
except ObjectDoesNotExist:
|
|
return None
|
|
except MultipleObjectsReturned:
|
|
print '###ERROR: multiple attribute definition with the name %s' %name
|
|
|
|
|
|
def get_all_attribute_definitions():
|
|
try:
|
|
return AttributeDefinition.objects.all()
|
|
except:
|
|
return None
|
|
|
|
|
|
def get_all_sources():
|
|
try:
|
|
return Source.objects.all()
|
|
except:
|
|
return None
|
|
|
|
|
|
def load_profile_by_dic(profile, dic):
|
|
if not profile or not dic:
|
|
return
|
|
for source in dic:
|
|
print "Found source in session: %s" % source
|
|
s = get_source_form_name(source)
|
|
if s:
|
|
print "Found source in db: %s" % s
|
|
if 'attributes' in dic[source]:
|
|
print "attributes: %s" % dic[source]['attributes']
|
|
for attr in dic[source]['attributes']:
|
|
print "With attribute %s of %s with values %s" % \
|
|
(attr['name'], attr['namespace'], str([x for x in attr['values']]))
|
|
d = get_def_from_name_and_ns(attr['name'], attr['namespace'])
|
|
if not d:
|
|
print "definition not found for %s %s" %(attr['name'], attr['namespace'])
|
|
else:
|
|
print "definition found %s" %d
|
|
add_assertion_to_profile(profile, s, d, attr['values'])
|
|
else:
|
|
print "No attributes for this source"
|
|
|
|
|
|
@transaction.commit_manually
|
|
def remove_predicate(predicate):
|
|
try:
|
|
if not predicate:
|
|
raise Exception(_('No predicate provided'))
|
|
else:
|
|
logger.debug('remove_predicate: Begin deletion of predicate %s with id %s' % (predicate, predicate.id))
|
|
|
|
'''
|
|
Objects to delete for predicate required:
|
|
- AssertionDefinition
|
|
- Attached source
|
|
|
|
Objects to delete for predicate comparisons:
|
|
- AssertionDefinition
|
|
- Attached source
|
|
- AssertionData
|
|
- Attribute data
|
|
- Values
|
|
'''
|
|
|
|
instance = predicate.get_predicate_instance()
|
|
if isinstance(instance, PredicateRequired):
|
|
logger.debug('remove_predicate: predicate required found')
|
|
for s in AttachedSource.objects.filter(assertion=instance.definition):
|
|
logger.debug('remove_predicate: remove attached source with id %s' %s.id)
|
|
s.delete()
|
|
logger.debug('remove_predicate: remove assertion definition with id %s' %instance.definition.id)
|
|
instance.definition.delete()
|
|
elif isinstance(instance, PredicateComparison):
|
|
logger.debug('remove_predicate: predicate comparison found')
|
|
assertion = instance.operand1.get_assertion_instance()
|
|
if isinstance(assertion, AssertionDefinition):
|
|
logger.debug('remove_predicate: operand one is an assertion definition')
|
|
for s in AttachedSource.objects.filter(assertion=assertion):
|
|
logger.debug('remove_predicate: remove attached source with id %s' %s.id)
|
|
s.delete()
|
|
logger.debug('remove_predicate: remove assertion definition with id %s' %assertion.id)
|
|
assertion.delete()
|
|
elif isinstance(assertion, AssertionData):
|
|
logger.debug('remove_predicate: operand one is an assertion data')
|
|
for v in assertion.get_values():
|
|
logger.debug('remove_predicate: remove value %s with id %s' % (v, v.id))
|
|
v.delete()
|
|
logger.debug('remove_predicate: remove attribute data with id %s' % assertion.attribute_data.id)
|
|
assertion.attribute_data.delete()
|
|
logger.debug('remove_predicate: remove assertion data with id %s' % assertion.id)
|
|
assertion.delete()
|
|
else:
|
|
raise Exception(_('Unknown operand one'))
|
|
assertion = instance.operand2.get_assertion_instance()
|
|
if isinstance(assertion, AssertionDefinition):
|
|
logger.debug('remove_predicate: operand two is an assertion definition')
|
|
for s in AttachedSource.objects.filter(assertion=assertion):
|
|
logger.debug('remove_predicate: remove attached source with id %s' %s.id)
|
|
s.delete()
|
|
logger.debug('remove_predicate: remove assertion definition with id %s' %assertion.id)
|
|
assertion.delete()
|
|
elif isinstance(assertion, AssertionData):
|
|
logger.debug('remove_predicate: operand two is an assertion data')
|
|
for v in assertion.get_values():
|
|
logger.debug('remove_predicate: remove value %s with id %s' % (v, v.id))
|
|
v.delete()
|
|
data = assertion.attribute_data
|
|
logger.debug('remove_predicate: remove assertion data with id %s' % assertion.id)
|
|
assertion.delete()
|
|
logger.debug('remove_predicate: remove attribute data with id %s' % data.id)
|
|
data.delete()
|
|
else:
|
|
raise Exception(_('Unknown operand two'))
|
|
else:
|
|
raise Exception(_('Unknown predicate type'))
|
|
|
|
logger.debug('remove_predicate: deletion of the predicate')
|
|
predicate.delete()
|
|
except Exception, err:
|
|
transaction.rollback()
|
|
logger.critical('remove_predicate: error deleting predicate due to %s'
|
|
% err)
|
|
raise err
|
|
else:
|
|
transaction.commit()
|
|
logger.debug('remove_predicate: predicate deleted')
|
|
|
|
|
|
@transaction.commit_manually
|
|
def remove_rule(rule):
|
|
try:
|
|
if not rule:
|
|
raise Exception(_('No rule provided'))
|
|
else:
|
|
logger.debug('remove_rule: Begin deletion of rule %s with id %s' % (rule, rule.id))
|
|
|
|
for p in Predicate.objects.filter(rule=rule):
|
|
logger.debug('remove_rule: found predicate %s' % p)
|
|
remove_predicate(p)
|
|
|
|
logger.debug('remove_rule: deletion of the rule')
|
|
rule.delete()
|
|
except Exception, err:
|
|
transaction.rollback()
|
|
logger.critical('remove_rule: error deleting rule due to %s'
|
|
% err)
|
|
raise err
|
|
else:
|
|
transaction.commit()
|
|
logger.debug('remove_rule: rule deleted')
|