[abac] Use attribute aggregator module.

The core management of attributes of ACS now relies on the attribute_aggregator
  module.
This commit is contained in:
Mikaël Ates 2011-09-26 14:23:05 +02:00
parent 3a71afbbc2
commit ab15e93dac
5 changed files with 234 additions and 1254 deletions

View File

@ -30,11 +30,13 @@ from acs.models import Namespace, UserAlias
from acs.abac.models import *
from acs.xacml.constants import *
from attribute_aggregator.xacml_constants import *
from attribute_aggregator.core import get_attribute_type_of_definition
from acs.core import get_alias_in_policy_from_namespace, \
stack_of_roles_from_user
logger = logging.getLogger('abac')
@ -86,36 +88,26 @@ def check_predicate_comparison(predicate, profile):
l_data1 = []
data1 = predicate.operand1.get_assertion_instance()
if isinstance(data1, AssertionDefinition):
sources = AttachedSource.objects.filter(assertion=data1)
if not sources:
return False
definition = data1.attribute_definition
for source in sources:
for ad in profile.assertions.all():
if ad.source.id == source.source.id \
and ad.attribute_data.definition.id == definition.id:
l_data1.append(ad.attribute_data)
if not l_data1:
return False
'''
find in profile the data corresponding to this definition
'''
for source in data1.sources.all():
l_data1 += profile.get_data_of_definition_and_source(\
data1.get_definition(), source)
else:
l_data1.append(data1.attribute_data)
l_data1 = [data1.get_attribute_data()]
#operand2
l_data2 = []
data2 = predicate.operand2.get_assertion_instance()
if isinstance(data2, AssertionDefinition):
sources = AttachedSource.objects.filter(assertion=data2)
if not sources:
return False
definition = data2.attribute_definition
for source in sources:
for ad in profile.assertions.all():
if ad.source.id == source.source.id \
and ad.attribute_data.definition.id == definition.id:
l_data2.append(ad.attribute_data)
if not l_data2:
return False
'''
find in profile the data corresponding to this definition
'''
for source in data2.sources.all():
l_data2 += profile.get_data_of_definition_and_source(\
data2.get_definition(), source)
else:
l_data2.append(data2.attribute_data)
l_data2 = [data2.get_attribute_data()]
for d1 in l_data1:
for d2 in l_data2:
if compare_two_data(predicate, d1, d2):
@ -190,35 +182,16 @@ def check_predicate_comparison(predicate, profile):
def compare_two_data(predicate, data1, data2):
if data1.definition.id != data2.definition.id:
logger.debug("compare_two_data: check if definitions match")
if data1.get_definition() != data2.get_definition():
logger.debug("compare_two_data: \
Definition %s does not match with %s" \
% (data1.get_definition(), data2.get_definition()))
return False
if data1.definition.attribute_type == ACS_XACML_DATATYPE_STRING:
data1_values = StringM.objects.filter(data=data1)
data2_values = StringM.objects.filter(data=data2)
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_BOOLEAN:
data1_values = BooleanM.objects.filter(data=data1)
data2_values = BooleanM.objects.filter(data=data2)
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_INTEGER:
data1_values = IntegerM.objects.filter(data=data1)
data2_values = IntegerM.objects.filter(data=data2)
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_DOUBLE:
data1_values = DoubleM.objects.filter(data=data1)
data2_values = DoubleM.objects.filter(data=data2)
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_TIME:
data1_values = TimeM.objects.filter(data=data1)
data2_values = TimeM.objects.filter(data=data2)
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_DATE:
data1_values = DateM.objects.filter(data=data1)
data2_values = DateM.objects.filter(data=data2)
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_DATETIME:
data1_values = DateTimeM.objects.filter(data=data1)
data2_values = DateTimeM.objects.filter(data=data2)
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_RFC822NAME:
data1_values = Rfc822NameM.objects.filter(data=data1)
data2_values = Rfc822NameM.objects.filter(data=data2)
elif data1.definition.attribute_type == ACS_XACML_DATATYPE_IPADDRESS:
data1_values = IpAddressM.objects.filter(data=data1)
data2_values = IpAddressM.objects.filter(data=data2)
logger.debug("compare_two_data: convert values...")
data1_values = data1.get_converted_values()
data2_values = data2.get_converted_values()
if not data1_values or not data2_values:
logger.debug("compare_two_data: \
@ -230,15 +203,16 @@ def compare_two_data(predicate, data1, data2):
logger.debug("compare_two_data: \
Return False because a single value is required for operand one \
and multiple were found %s for predicate %s" \
% ([x.value for x in data1_values], predicate))
% ([x for x in data1_values], predicate))
return False
if predicate.operand2_single_value and len(data2_values) > 1:
logger.debug("compare_two_data: \
Return False because a single value is required for operand two \
and multiple were found %s for predicate %s" \
% ([x.value for x in data2_values], predicate))
% ([x for x in data2_values], predicate))
return False
if not(predicate.operand1_single_value \
and predicate.operand2_single_value) \
and ((predicate.comparison_type in XACML_COMPARISON_EQUALITY \
@ -259,17 +233,16 @@ def compare_two_data(predicate, data1, data2):
return False
logger.debug("compare_two_data: \
Evaluation of predicate %s" \
% str(predicate))
Evaluation of predicate %s" % str(predicate))
if predicate.comparison_type in XACML_COMPARISON_EQUALITY:
return test_equality_of_values(data1_values, data2_values,
data1.definition.attribute_type,
get_attribute_type_of_definition(data1.get_definition()),
predicate.comparison_type,
predicate.multivalues)
elif predicate.comparison_type in ACS_XACML_COMPARISON:
return test_diff_of_multivalues(data1_values, data2_values,
data1.definition.attribute_type,
get_attribute_type_of_definition(data1.get_definition()),
predicate.comparison_type,
predicate.multivalues)
@ -280,13 +253,14 @@ def compare_two_data(predicate, data1, data2):
return False
def test_diff_of_multivalues(data1_values, data2_values, data_type, comparison_type,
test_type='NO_MULTIVALUES'):
def test_diff_of_multivalues(data1_values, data2_values, data_type,
comparison_type, test_type='NO_MULTIVALUES'):
if test_type == 'NO_MULTIVALUES':
logger.debug("test_diff_of_multivalues: \
No multivalues, test %s and %s" \
% (data1_values[0].value, data2_values[0].value))
if test_diff_two_values(data1_values[0].value, data2_values[0].value, comparison_type):
% (data1_values[0], data2_values[0]))
if test_diff_two_values(data1_values[0], data2_values[0],
comparison_type):
logger.debug("test_diff_of_multivalues: True")
return True
else:
@ -306,7 +280,7 @@ def test_diff_of_multivalues(data1_values, data2_values, data_type, comparison_t
ACS_XACML_COMPARISON_GRT_OE):
for v1 in data1_values:
for v2 in data2_values:
if not test_diff_two_values(v1.value, v2.value, comparison_type):
if not test_diff_two_values(v1, v2, comparison_type):
logger.debug("test_diff_of_multivalues: \
DIFF_ALL_OP1_WITH_BOTTOM_LIMIT_OP2 (LT) or \
DIFF_ALL_OP1_WITH_UPPER_LIMIT_OP2 (GRT) - \
@ -328,7 +302,7 @@ def test_diff_of_multivalues(data1_values, data2_values, data_type, comparison_t
for v2 in data2_values:
found = True
for v1 in data1_values:
if not test_diff_two_values(v1.value, v2.value, comparison_type):
if not test_diff_two_values(v1, v2, comparison_type):
found = False
if found:
return True
@ -352,7 +326,7 @@ def test_diff_of_multivalues(data1_values, data2_values, data_type, comparison_t
for v1 in data1_values:
found = True
for v2 in data2_values:
if not test_diff_two_values(v1.value, v2.value, comparison_type):
if not test_diff_two_values(v1, v2, comparison_type):
found = False
if found:
return True
@ -375,7 +349,7 @@ def test_diff_of_multivalues(data1_values, data2_values, data_type, comparison_t
ACS_XACML_COMPARISON_GRT_OE):
for v1 in data1_values:
for v2 in data2_values:
if test_diff_two_values(v1.value, v2.value, comparison_type):
if test_diff_two_values(v1, v2, comparison_type):
return True
logger.debug("test_diff_of_multivalues: \
DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2 (GRT) or \
@ -405,15 +379,15 @@ def test_diff_two_values(value1, value2, comparison_type):
return False
def test_equality_of_values(data1_values, data2_values, data_type, comparison_type,
test_type='NO_MULTIVALUES'):
def test_equality_of_values(data1_values, data2_values, data_type,
comparison_type, test_type='NO_MULTIVALUES'):
if test_type == 'NO_MULTIVALUES':
if data_type == ACS_XACML_DATATYPE_STRING \
and comparison_type == \
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE:
if data1_values[0].value.lower() == data2_values[0].value.lower():
if data1_values[0].lower() == data2_values[0].lower():
return True
elif data1_values[0].value == data2_values[0].value:
elif data1_values[0] == data2_values[0]:
return True
elif test_type == 'EQUAL_ONE_VALUE':
for v1 in data1_values:
@ -421,9 +395,9 @@ def test_equality_of_values(data1_values, data2_values, data_type, comparison_ty
if data_type == ACS_XACML_DATATYPE_STRING \
and comparison_type == \
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE:
if v1.value.lower() == v2.value.lower():
if v1.lower() == v2.lower():
return True
elif v1.value == v2.value:
elif v1 == v2:
return True
elif test_type == 'EQUAL_OP1_SUBSET_OP2':
for v1 in data1_values:
@ -432,9 +406,9 @@ def test_equality_of_values(data1_values, data2_values, data_type, comparison_ty
if data_type == ACS_XACML_DATATYPE_STRING \
and comparison_type == \
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE:
if v1.value.lower() == v2.value.lower():
if v1.lower() == v2.lower():
found = True
elif v1.value == v2.value:
elif v1 == v2:
found = True
if not found:
return False
@ -448,9 +422,9 @@ def test_equality_of_values(data1_values, data2_values, data_type, comparison_ty
if data_type == ACS_XACML_DATATYPE_STRING \
and comparison_type == \
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE:
if v1.value.lower() == v2.value.lower():
if v1.lower() == v2.lower():
found = True
elif v1.value == v2.value:
elif v1 == v2:
found = True
if not found:
return False
@ -470,36 +444,28 @@ def check_predicate_required(predicate, profile):
it is required to use multiple required predicates and NOT operators
'''
logger.debug("check_predicate_required: check %s" % predicate)
definition = predicate.definition.attribute_definition
sources = AttachedSource.objects.filter(assertion=predicate.definition)
if not predicate.single_value:
logger.debug("check_predicate_required: single value not required")
for source in sources:
for ad in profile.assertions.all():
if ad.source.id == source.source.id \
and ad.attribute_data.definition.id == definition.id:
logger.debug("check_predicate_required: satisfied")
return True
for source in predicate.get_sources():
d = profile.get_data_of_definition_and_source(\
predicate.get_definition(), source)
if d:
logger.debug("check_predicate_required: satisfied by %s" \
% str([a.__unicode__() for a in d]))
return True
else:
logger.debug("check_predicate_required: single value required")
found = False
for source in sources:
c = 0
for ad in profile.assertions.all():
if ad.source.id == source.source.id \
and ad.attribute_data.definition.id == definition.id:
c = c + 1
if c > 1:
logger.debug("check_predicate_required: multiple values found \
- not satisfied")
return False
elif c == 1:
found = True
if found:
match = [profile.get_data_of_definition_and_source(\
predicate.get_definition(), source) \
for source in predicate.get_sources()]
if len(match) == 1:
logger.debug("check_predicate_required: OK only one attribute \
with a single value in profile")
return True
logger.debug("check_predicate_required: At least one source with a \
unique value found")
else:
logger.debug("check_predicate_required: Failed - no value found \
or multiple")
return False
@ -513,7 +479,8 @@ def check_predicate_role(predicate, profile):
or not predicate.role \
or not profile.user:
return False
alias = get_alias_in_policy_from_namespace(profile.user, predicate.role.namespace)
alias = get_alias_in_policy_from_namespace(profile.user,
predicate.role.namespace)
if not alias:
logger.debug("check_predicate_role: no alias found for user: %s \
in namespace %s" % (profile.user, predicate.role.namespace))
@ -549,223 +516,105 @@ def check_predicates(rule, profile):
return dic
def get_source_form_name(name):
@transaction.commit_manually
def remove_predicate(predicate):
try:
return Source.objects.get(name=name)
except:
return None
def get_def_from_name_and_ns(name, namespace):
if not name or not namespace:
return None
try:
ans = AttributeName.objects.filter(name=name)
for an in ans:
if an.attribute_map.namespace.identifier == namespace:
return an.attribute_map.attribute_definition
return None
except:
return None
#@transaction.commit_manually
def add_assertion_to_profile(profile, source, definition, values,
expiration_date=None):
'''
Values is a list of strings converted to the data type given by the
definition datatype
'''
a = None
logger.debug('add_assertion_to_profile: Add assertion to profile %s \
- (%s, %s, %s)' % (profile, source, definition, values))
try:
data = None
try:
data = AttributeData(definition=definition)
data.save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to add the assertion \
data due to %s' % err)
raise err
if definition.attribute_type == ACS_XACML_DATATYPE_STRING:
for value in values:
try:
StringM(data=data, value=value).save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to add value \
%s due to %s' % (value, err))
elif definition.attribute_type == ACS_XACML_DATATYPE_BOOLEAN:
for value in values:
try:
v = False
if value in ('True', 'true', 'Vrai', 'vrai'):
v = True
BooleanM(data=data, value=v).save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to convert in \
boolean or add value %sdue to %s' % (value, err))
elif definition.attribute_type == ACS_XACML_DATATYPE_INTEGER:
for value in values:
try:
v = int(value)
IntegerM(data=data, value=v).save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to convert in \
integer or add value %s due to %s' % (value, err))
elif definition.attribute_type == ACS_XACML_DATATYPE_DOUBLE:
for value in values:
try:
v = float(value)
IntegerM(data=data, value=v).save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to convert in \
double or add value %sdue to %s' % (value, err))
elif definition.attribute_type == ACS_XACML_DATATYPE_TIME:
for value in values:
try:
v = time.strptime(value,"%h:%m:%s") #12:15:00
TimeM(data=data, value=v).save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to convert in \
time or add value %sdue to %s' % (value, err))
elif definition.attribute_type == ACS_XACML_DATATYPE_DATE:
for value in values:
try:
v = time.strptime(value,"%d/%b/%Y") #28/01/1982
DateM(data=data, value=v).save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to convert in \
date or add value %sdue to %s' % (value, err))
elif definition.attribute_type == ACS_XACML_DATATYPE_DATETIME:
for value in values:
try:
v = time.strptime(value,"%d/%b/%Y-%h:%m:%s") #28/01/1982-03:00:00
DateTimeM(data=data, value=v).save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to convert in \
datetime or add value %sdue to %s' % (value, err))
elif definition.attribute_type == ACS_XACML_DATATYPE_RFC822NAME:
for value in values:
try:
v = None
r = re.compile('[a-zA-Z0-9+_\-\.]+@[0-9a-zA-Z][.-0-9a-zA-Z]*.[a-zA-Z]+') # email
if r.search(value):
v = value
else:
logger.error('add_assertion_to_profile: Unable to convert in \
rfc822name or add value %sdue to %s' % (value, err))
Rfc822NameM(data=data, value=v).save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to convert in \
rfc822name or add value %sdue to %s' % (value, err))
elif definition.attribute_type == ACS_XACML_DATATYPE_IPADDRESS:
for value in values:
try:
v = None
r = re.compile('(?:[\d]{1,3})\.(?:[\d]{1,3})\.(?:[\d]{1,3})\.(?:[\d]{1,3})') # x.x.x.x
if r.search(value):
v = value
else:
logger.error('add_assertion_to_profile: Unable to convert in \
ipaddress or add value %sdue to %s' % (value, err))
IpAddressM(data=data, value=v).save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to convert in \
ipaddress or add value %sdue to %s' % (value, err))
if not predicate:
raise Exception(_('No predicate provided'))
else:
logger.error('add_assertion_to_profile: Unknown definition data type \
%s' % definition.attribute_type)
raise Exception()
if expiration_date:
#expiration_date = time.strptime(expiration_date,"%d/%b/%Y-%h:%m:%s") #28/01/1982-03:00:00
logger.debug('add_assertion_to_profile: expiration date %s found \
- conversion' % expiration_date)
try:
expiration_date = iso8601_to_datetime(expiration_date)
except:
logger.error('add_assertion_to_profile: Conversion failed - \
the date is not ISO8601')
expiration_date = None
logger.debug(\
'remove_predicate: Begin deletion of predicate %s with id %s' \
% (predicate, predicate.id))
'''
Possibilities if the profile contains an assertion with the same
definition and the same source:
- append values if expiration_date are the same
- remove and the new assertion
- add a new assertion withtou checking
Objects to delete for predicate required:
- AssertionDefinition
The third choices is made.
Then the profile may contain multiple assertion for a same
definition and source.
Objects to delete for predicate role:
- None
!!! It is enough that one of them satisfies the predicate. !!!
cf. functions check_predicate_comparison and
check_predicate_required
If it is not the behaviour expected, reduce expiration date.
Objects to delete for predicate comparisons:
- AssertionDefinition
- AssertionData
'''
try:
a = AssertionData(source=source, attribute_data=data,
expiration_date=expiration_date)
a.save()
profile.assertions.add(a)
profile.save()
except Exception, err:
logger.error('add_assertion_to_profile: Unable to add the assertion \
data due to %s' % err)
raise err
instance = predicate.get_predicate_instance()
if isinstance(instance, PredicateRole):
pass
elif isinstance(instance, PredicateRequired):
logger.debug('remove_predicate: predicate required found')
instance.assertion_definition.delete()
elif isinstance(instance, PredicateComparison):
logger.debug('remove_predicate: predicate comparison found')
assertion = instance.operand1.get_assertion_instance()
if isinstance(assertion, AssertionDefinition):
logger.debug('remove_predicate: \
operand one is an assertion definition')
logger.debug('remove_predicate: \
remove assertion definition with id %s' %assertion.id)
assertion.delete()
elif isinstance(assertion, AssertionData):
logger.debug('remove_predicate: \
operand one is an assertion data')
logger.debug('remove_predicate: \
remove assertion data with id %s' % assertion.id)
assertion.delete()
else:
raise Exception(_('Unknown operand one'))
assertion = instance.operand2.get_assertion_instance()
if isinstance(assertion, AssertionDefinition):
logger.debug('remove_predicate: \
operand two is an assertion definition')
logger.debug('remove_predicate: \
remove assertion definition with id %s' %assertion.id)
assertion.delete()
elif isinstance(assertion, AssertionData):
logger.debug('remove_predicate: \
operand two is an assertion data')
logger.debug('remove_predicate: \
remove assertion data with id %s' % assertion.id)
assertion.delete()
else:
raise Exception(_('Unknown operand two'))
else:
raise Exception(_('Unknown predicate type'))
logger.debug('remove_predicate: deletion of the predicate')
predicate.delete()
except Exception, err:
# transaction.rollback()
return None
transaction.rollback()
logger.critical('remove_predicate: error deleting predicate due to %s'
% err)
raise err
else:
# transaction.commit()
logger.debug('add_assertion_to_profile: everything successfully \
performed- %s added' % a)
return a
transaction.commit()
logger.debug('remove_predicate: predicate deleted')
def iso8601_to_datetime(date_string):
'''
Convert a string formatted as an ISO8601 date into a time_t value.
This function ignores the sub-second resolution.
'''
m = re.match(r'(\d+-\d+-\d+T\d+:\d+:\d+)(?:\.\d+)?Z?$', date_string)
logger.debug('add_assertion_to_profile: m %s' % str(m))
if not m:
raise ValueError('Invalid ISO8601 date')
tm = time.strptime(m.group(1)+'Z', "%Y-%m-%dT%H:%M:%SZ")
logger.debug('add_assertion_to_profile: tm %s' % str(tm))
return datetime.datetime.fromtimestamp(time.mktime(tm))
@transaction.commit_manually
def remove_rule(rule):
try:
if not rule:
raise Exception(_('No rule provided'))
else:
logger.debug(\
'remove_rule: Begin deletion of rule %s with id %s' \
% (rule, rule.id))
for p in Predicate.objects.filter(rule=rule):
logger.debug('remove_rule: found predicate %s' % p)
remove_predicate(p)
def arrange_missing_predicates(missing_predicates, profile):
'''
Here we have to retreat the priority of possibilities to satisfy
the rule.
missing_predicates present mutliples combinaisons possible to satisfy
the rule.
Output: we want to produce a rule with all the possibilities sorted by
the easiest to the hardest: rule = easiest_rule | ... | hardest_rule
This sort is based on the number of predicate not statisfied but also
on the following rules:
- First predicates required
- Comparison if predicate is False because an operande is missing
- Comparisons with only def
- comparison with data not satisfied
In practice, we prefer to ask the user to present its surname and its
firstname (2 predicate False up to now) rather to ask to present its
age if it has already been presented but did not satisfied a
comparison (only one predicate False but harder to satisfy)
TODO
'''
pass
logger.debug('remove_rule: deletion of the rule')
rule.delete()
except Exception, err:
transaction.rollback()
logger.critical('remove_rule: error deleting rule due to %s'
% err)
raise err
else:
transaction.commit()
logger.debug('remove_rule: rule deleted')
def make_new_rule_from_missing_predicates(missing_predicates):
@ -796,319 +645,25 @@ def make_new_rule_from_missing_predicates(missing_predicates):
return None
def get_namespace_by_friendly_name(name):
if not name:
return None
try:
return AttributeNamespace.objects.get(friendly_name=name)
except ObjectDoesNotExist:
return None
except MultipleObjectsReturned:
print '###ERROR: multiple namespace with the friendly name %s' %ns
def get_namespace_by_identifier(identifier):
if not identifier:
return None
try:
return AttributeNamespace.objects.get(identifier=identifier)
except ObjectDoesNotExist:
return None
except MultipleObjectsReturned:
print '###ERROR: multiple namespace with the identifier name %s' %identifier
def get_attribute_definition_by_name(name):
if not name:
return None
try:
return AttributeDefinition.objects.get(attribute_name=name)
except ObjectDoesNotExist:
return None
except MultipleObjectsReturned:
print '###ERROR: multiple attribute definition with the name %s' %name
def get_all_attribute_definitions():
try:
return AttributeDefinition.objects.all()
except:
return None
def get_all_sources():
try:
return Source.objects.all()
except:
return None
@transaction.commit_manually
def load_or_create_user_profile(user, no_cleanup=False):
def arrange_missing_predicates(missing_predicates, profile):
'''
If the user has a profile, remove assertions outdated and return
profile, else create a new one.
Here we have to retreat the priority of possibilities to satisfy
the rule.
missing_predicates present mutliples combinaisons possible to satisfy
the rule.
Output: we want to produce a rule with all the possibilities sorted by
the easiest to the hardest: rule = easiest_rule | ... | hardest_rule
This sort is based on the number of predicate not statisfied but also
on the following rules:
- First predicates required
- Comparison if predicate is False because an operande is missing
- Comparisons with only def
- comparison with data not satisfied
In practice, we prefer to ask the user to present its surname and its
firstname (2 predicate False up to now) rather to ask to present its
age if it has already been presented but did not satisfied a
comparison (only one predicate False but harder to satisfy)
If cleanup: expiration_date < now() remove assertion data from profile
If no_cleanup: return profile if any without removing outdated
assertions
TODO
'''
profile = None
try:
if user:
try:
logger.info('load_or_create_user_profile: search profile for %s' \
% user)
profile = UserAttributeProfile.objects.get(user=user)
except ObjectDoesNotExist:
profile = UserAttributeProfile(user=user)
profile.save()
logger.info('load_or_create_user_profile: profile with id %s for \
user %s created' % (str(profile.id), user))
else:
if no_cleanup:
logger.info('load_or_create_user_profile: Existing user \
profile %s for %s - No cleanup' % (profile, user))
else:
for ad in profile.assertions.all():
if not ad.expiration_date \
or ad.expiration_date < datetime.datetime.now():
logger.debug('load_or_create_user_profile: \
Remove outdated assertion %s of profile %s' \
% (ad, profile))
ad.delete()
else:
logger.debug('load_or_create_user_profile: \
%s kept in profile' \
% ad)
elif not user:
profile = UserAttributeProfile()
profile.save()
logger.info('load_or_create_user_profile: profile with id %s for \
an anonymous user created' % str(profile.id))
except Exception, err:
transaction.rollback()
logger.error('load_or_create_user_profile: An error occured %s' \
% str(err))
return None
else:
transaction.commit()
logger.debug('load_or_create_user_profile: everything successfully \
performed')
return profile
def load_profile_by_dic(profile, dic):
if not profile or not dic:
logger.error('load_profile_by_dic: \
Missing profile or dictionnary')
return
for source in dic:
logger.debug('load_profile_by_dic: loading from source with name: %s' \
% source)
s = get_source_form_name(source)
if s:
logger.debug('load_profile_by_dic: attributes: %s' \
% str(dic[source]))
for attr in dic[source]:
if not 'name' in attr or not 'namespace' in attr \
or not 'values' in attr:
logger.debug('load_profile_by_dic: \
missing data in attribute')
else:
logger.debug('load_profile_by_dic: \
attribute %s of %s with values %s' \
% (attr['name'], attr['namespace'],
str([x for x in attr['values']])))
expiration_date = None
if 'expiration_date' in attr:
logger.debug('load_profile_by_dic: expire at %s' \
% attr['expiration_date'])
expiration_date = attr['expiration_date']
d = get_def_from_name_and_ns(attr['name'],
attr['namespace'])
if not d:
logger.error('load_profile_by_dic: \
definition not found for %s %s' \
% (attr['name'], attr['namespace']))
else:
logger.debug('load_profile_by_dic: \
definition %s found' % d)
a = add_assertion_to_profile(profile, s, d,
attr['values'], expiration_date=expiration_date)
if not a:
logger.debug('load_profile_by_dic: \
error adding assertion')
else:
logger.debug('load_profile_by_dic: \
assertion %s added' % a)
else:
logger.critical('load_profile_by_dic: \
The source with name %s and attributes %s \
is unknown of the system' \
% (str(source), str(dic[source])))
def get_assertions_from_definition(profile, definition, source=None):
if not source:
return [a for a in profile.assertions.all() \
if a.definition.id == definition.id]
return [a for a in profile.assertions.all() \
if a.definition.id == definition.id \
and a.source.id == source.id]
@transaction.commit_manually
def remove_predicate(predicate):
try:
if not predicate:
raise Exception(_('No predicate provided'))
else:
logger.debug('remove_predicate: Begin deletion of predicate %s with id %s' % (predicate, predicate.id))
'''
Objects to delete for predicate required:
- AssertionDefinition
- Attached source
Objects to delete for predicate role:
- None
Objects to delete for predicate comparisons:
- AssertionDefinition
- Attached source
- AssertionData
- Attribute data
- Values
'''
instance = predicate.get_predicate_instance()
if isinstance(instance, PredicateRole):
pass
elif isinstance(instance, PredicateRequired):
logger.debug('remove_predicate: predicate required found')
for s in AttachedSource.objects.filter(assertion=instance.definition):
logger.debug('remove_predicate: remove attached source with id %s' %s.id)
s.delete()
logger.debug('remove_predicate: remove assertion definition with id %s' %instance.definition.id)
instance.definition.delete()
elif isinstance(instance, PredicateComparison):
logger.debug('remove_predicate: predicate comparison found')
assertion = instance.operand1.get_assertion_instance()
if isinstance(assertion, AssertionDefinition):
logger.debug('remove_predicate: operand one is an assertion definition')
for s in AttachedSource.objects.filter(assertion=assertion):
logger.debug('remove_predicate: remove attached source with id %s' %s.id)
s.delete()
logger.debug('remove_predicate: remove assertion definition with id %s' %assertion.id)
assertion.delete()
elif isinstance(assertion, AssertionData):
logger.debug('remove_predicate: operand one is an assertion data')
for v in assertion.get_values():
logger.debug('remove_predicate: remove value %s with id %s' % (v, v.id))
v.delete()
logger.debug('remove_predicate: remove attribute data with id %s' % assertion.attribute_data.id)
assertion.attribute_data.delete()
logger.debug('remove_predicate: remove assertion data with id %s' % assertion.id)
assertion.delete()
else:
raise Exception(_('Unknown operand one'))
assertion = instance.operand2.get_assertion_instance()
if isinstance(assertion, AssertionDefinition):
logger.debug('remove_predicate: operand two is an assertion definition')
for s in AttachedSource.objects.filter(assertion=assertion):
logger.debug('remove_predicate: remove attached source with id %s' %s.id)
s.delete()
logger.debug('remove_predicate: remove assertion definition with id %s' %assertion.id)
assertion.delete()
elif isinstance(assertion, AssertionData):
logger.debug('remove_predicate: operand two is an assertion data')
for v in assertion.get_values():
logger.debug('remove_predicate: remove value %s with id %s' % (v, v.id))
v.delete()
data = assertion.attribute_data
logger.debug('remove_predicate: remove assertion data with id %s' % assertion.id)
assertion.delete()
logger.debug('remove_predicate: remove attribute data with id %s' % data.id)
data.delete()
else:
raise Exception(_('Unknown operand two'))
else:
raise Exception(_('Unknown predicate type'))
logger.debug('remove_predicate: deletion of the predicate')
predicate.delete()
except Exception, err:
transaction.rollback()
logger.critical('remove_predicate: error deleting predicate due to %s'
% err)
raise err
else:
transaction.commit()
logger.debug('remove_predicate: predicate deleted')
@transaction.commit_manually
def remove_rule(rule):
try:
if not rule:
raise Exception(_('No rule provided'))
else:
logger.debug('remove_rule: Begin deletion of rule %s with id %s' % (rule, rule.id))
for p in Predicate.objects.filter(rule=rule):
logger.debug('remove_rule: found predicate %s' % p)
remove_predicate(p)
logger.debug('remove_rule: deletion of the rule')
rule.delete()
except Exception, err:
transaction.rollback()
logger.critical('remove_rule: error deleting rule due to %s'
% err)
raise err
else:
transaction.commit()
logger.debug('remove_rule: rule deleted')
def get_identifier_in_source(user, source):
if not user:
logger.error('get_identifier_in_source: no user provided')
return None
if not source:
logger.error('get_identifier_in_source: no source provided')
return None
'''
If the source is used for authentication and the option
is_auth_backend is selected, we consider that the username is the
identifier in the ldap, i.e. the dn
We may do better with django_to_ldap_username() of
http://packages.python.org/django-auth-ldap/
'''
if source.is_auth_backend:
return user.username
ns = None
try:
ns = Namespace.objects.get(name=source.name)
except ObjectDoesNotExist:
logger.error('get_identifier_in_source: no corresponding namespace')
return None
except MultipleObjectsReturned:
logger.critical('get_identifier_in_source: multiple namespaces \
corresponding to source %s' % source)
return None
try:
ua = UserAlias.objects.get(user=user, namespace=ns)
logger.debug('get_identifier_in_source: user has alias %s in source \
%s' % (ua.alias, source.name))
return ua.alias
except ObjectDoesNotExist:
logger.error('get_identifier_in_source: user has no correspondiong \
alias for that source')
return None
except MultipleObjectsReturned:
logger.critical('get_identifier_in_source: multiple aliases of user \
%s for the source %s' % (user, source))
return None
pass

View File

@ -21,151 +21,12 @@ import re
import random
import string
from cPickle import loads, dumps
from django.db import models
from django.contrib.auth.models import User
from django.utils.translation import ugettext as _
from acs.xacml.constants import *
class Source(models.Model):
name = models.CharField(max_length = 100, unique=True)
def __unicode__(self):
return 'Source %s' % self.name
def get_source_instance(self):
try:
return self.ldapsource
except:
pass
return None
class LdapSource(Source):
server = models.CharField(max_length=100, unique=True)
user = models.CharField(max_length=100, blank=True, null=True)
password = models.CharField(max_length=100, blank=True, null=True)
base = models.CharField(max_length=100)
port = models.IntegerField(default=389)
ldaps = models.BooleanField(default=False)
certificate = models.TextField(blank=True)
is_auth_backend = models.BooleanField(default=False)
class AttributeDefinition(models.Model):
attribute_name = models.CharField(max_length = 100, unique=True)
attribute_type = models.CharField(
max_length = 100, choices = XACML_DATA_TYPE,
verbose_name = '',
default = ACS_XACML_DATATYPE_STRING)
def __unicode__(self):
return self.attribute_name
class AttributeNamespace(models.Model):
friendly_name = models.CharField(max_length = 100, unique=True)
identifier = models.CharField(max_length = 100, unique=True)
class AttributeMapInNamespace(models.Model):
namespace = models.ForeignKey(AttributeNamespace)
attribute_definition = models.ForeignKey(AttributeDefinition)
class AttributeName(models.Model):
attribute_map = models.ForeignKey(AttributeMapInNamespace)
name = models.CharField(max_length = 100)
class AttributeData(models.Model):
definition = models.ForeignKey(AttributeDefinition)
class StringM(models.Model):
'''http://www.w3.org/2001/XMLSchema#string'''
data = models.ForeignKey(AttributeData)
value = models.CharField(max_length = 100)
def __unicode__(self):
return self.value
class BooleanM(models.Model):
'''http://www.w3.org/2001/XMLSchema#boolean'''
data = models.ForeignKey(AttributeData)
value = models.BooleanField()
def __unicode__(self):
return str(self.value)
class IntegerM(models.Model):
'''http://www.w3.org/2001/XMLSchema#integer'''
data = models.ForeignKey(AttributeData)
value = models.IntegerField()
def __unicode__(self):
return str(self.value)
class DoubleM(models.Model):
'''http://www.w3.org/2001/XMLSchema#double'''
data = models.ForeignKey(AttributeData)
value = models.FloatField()
def __unicode__(self):
return str(self.value)
class TimeM(models.Model):
'''http://www.w3.org/2001/XMLSchema#time'''
data = models.ForeignKey(AttributeData)
value = models.TimeField()
def __unicode__(self):
return str(self.value)
class DateM(models.Model):
'''http://www.w3.org/2001/XMLSchema#date'''
data = models.ForeignKey(AttributeData)
value = models.DateField()
def __unicode__(self):
return str(self.value)
class DateTimeM(models.Model):
'''http://www.w3.org/2001/XMLSchema#dateTime'''
data = models.ForeignKey(AttributeData)
value = models.DateTimeField()
def __unicode__(self):
return str(self.value)
class Rfc822NameM(models.Model):
'''urn:oasis:names:tc:xacml:1.0:data-type:rfc822Name'''
data = models.ForeignKey(AttributeData)
value = models.EmailField()
def __unicode__(self):
return str(self.value)
class IpAddressM(models.Model):
'''urn:oasis:names:tc:xacml:2.0:data-type:ipAddress'''
data = models.ForeignKey(AttributeData)
value = models.IPAddressField()
def __unicode__(self):
return str(self.value)
class Certificate(models.Model):
raw_data = models.TextField()
from attribute_aggregator.xacml_constants import *
from attribute_aggregator.models import AttributeSource
class AssertionAny(models.Model):
@ -182,97 +43,59 @@ class AssertionAny(models.Model):
return None
class AssertionDefinition(AssertionAny):
attribute_definition = models.ForeignKey(AttributeDefinition)
def __unicode__(self):
sources = AttachedSource.objects.filter(assertion=self)
return "attribute %s from %s" \
% (str(self.attribute_definition),
[str(x.source) for x in sources])
class AttachedSource(models.Model):
source = models.ForeignKey(Source)
assertion = models.ForeignKey(AssertionDefinition,
related_name = 'assertion')
def __unicode__(self):
return "Source %s attached to %s" \
% (str(self.source), str(self.assertion))
class AssertionData(AssertionAny):
'''
An assertion data may have no source or a unique source attached.
No source when used in a rule for a comparison for instance.
A unique source when used in a profile for instance.
'''
attribute_data = models.ForeignKey(AttributeData)
source = models.ForeignKey(Source, null=True, blank=True)
certificate = models.ForeignKey(Certificate, null=True, blank=True)
creation_date = models.DateTimeField(auto_now_add=True)
expiration_date = models.DateTimeField(null=True, blank=True)
attribute_data = models.TextField(null=True, blank=True)
# def __init__(self, attribute_data=None, *args, **kwargs):
# super(AssertionData, self).__init__(*args, **kwargs)
# if attribute_data:
# self.attribute_data = dumps(attribute_data)
def get_attribute_data(self):
return loads(str(self.attribute_data))
def get_values(self):
values = None
definition = self.attribute_data.definition
if definition.attribute_type == ACS_XACML_DATATYPE_STRING:
values = StringM.objects.filter(data=self.attribute_data)
elif definition.attribute_type == ACS_XACML_DATATYPE_BOOLEAN:
values = BooleanM.objects.filter(data=self.attribute_data)
elif definition.attribute_type == ACS_XACML_DATATYPE_INTEGER:
values = IntegerM.objects.filter(data=self.attribute_data)
elif definition.attribute_type == ACS_XACML_DATATYPE_DOUBLE:
values = DoubleM.objects.filter(data=self.attribute_data)
elif definition.attribute_type == ACS_XACML_DATATYPE_TIME:
values = TimeM.objects.filter(data=self.attribute_data)
elif definition.attribute_type == ACS_XACML_DATATYPE_DATE:
values = DateM.objects.filter(data=self.attribute_data)
elif definition.attribute_type == ACS_XACML_DATATYPE_DATETIME:
values = DateTimeM.objects.filter(data=self.attribute_data)
elif definition.attribute_type == ACS_XACML_DATATYPE_RFC822NAME:
values = Rfc822NameM.objects.filter(data=self.attribute_data)
elif definition.attribute_type == ACS_XACML_DATATYPE_IPADDRESS:
values = IpAddressM.objects.filter(data=self.attribute_data)
return values
def set_attribute_data(self, attribute_data):
if attribute_data:
self.attribute_data = dumps(attribute_data)
self.save()
return self.get_attribute_data()
return None
def __unicode__(self):
values = self.get_values()
s = "Attribute %s with values %s" \
% (str(self.attribute_data.definition),
[str(x) for x in values])
if self.source:
s += ' (provided by %s)' % str(self.source)
if self.certificate:
s += ' (signed with %s)' % str(self.certificate)
if self.expiration_date:
s += ' (expires on %s)' % str(self.expiration_date)
data = self.get_attribute_data()
return data.__unicode__()
class AssertionDefinition(AssertionAny):
definition = models.CharField(max_length=200)
sources = models.ManyToManyField(AttributeSource, null=True, blank=True)
def add_source(self, source):
if not source in self.sources.all():
self.sources.add(source)
self.save()
return 0
def get_definition(self):
return self.definition
def remove_source(self, source):
if source in self.sources.all():
self.sources.delete(source)
self.save()
return 0
def __unicode__(self):
s = self.definition
if self.sources:
s += ' from '
for source in self.sources.all():
s = s + source.name + ', '
if len(s) > 6:
s = s[:-2]
else:
s = self.definition
return s
class UserAttributeProfile(models.Model):
user = models.OneToOneField(User, null=True, blank=True,
related_name='profile')
assertions = models.ManyToManyField(AssertionData,
verbose_name=_('data_assertions'), blank=True)
def __unicode__(self):
if not self.user:
s = 'Anonymous profile'
else:
s = 'Profile of user %s' % self.user
if not self.assertions:
return s + ' is empty.'
else:
for ad in self.assertions.all():
attribute_data = ad.attribute_data
s += " - assertion from %s with definition %s and values %s" \
% (ad.source, attribute_data.definition.id,
str([str(x.value) for x in ad.get_values()]))
return s
'''
An ABAC rule is a string containing logical statements (and, or, not) and
the identifiers of predicates.
@ -313,14 +136,21 @@ class Predicate(models.Model):
class PredicateRequired(Predicate):
definition = models.ForeignKey(AssertionDefinition)
assertion_definition = models.ForeignKey(AssertionDefinition)
single_value = models.BooleanField(default=False)
def get_definition(self):
return self.assertion_definition.definition
def get_sources(self):
return self.assertion_definition.sources.all()
def __unicode__(self):
if self.single_value:
return "Predicate required: %s - \
The attribute must be single-valued" % str(self.definition)
return "Predicate required: %s" % str(self.definition)
The attribute must be single-valued" \
% str(self.assertion_definition)
return "Predicate required: %s" % str(self.assertion_definition)
class PredicateRole(Predicate):
@ -364,8 +194,7 @@ class PredicateComparison(Predicate):
operator = '>='
s = 'Predicate comparison: %s %s %s (' \
% (str(self.operand1.get_assertion_instance()),
operator, str(self.operand2.get_assertion_instance()),
)
operator, str(self.operand2.get_assertion_instance()))
if self.operand1_single_value:
s += 'operand one requires a single-valued attribute - '
if self.operand2_single_value:

View File

@ -1,108 +0,0 @@
import logging
import ldap
from django.dispatch import Signal
from models import UserAlias
from abac.models import LdapSource, AttributeNamespace
logger = logging.getLogger('acs')
attributes_call = Signal(providing_args = ["request", "user"])
def ldap_sources(request, user, **kwargs):
if not user:
logger.error('ldap_source: No user provided')
return {}
logger.debug('ldap_source: Searching attributes for user %s' % user)
sources = LdapSource.objects.all()
if not sources:
logger.debug('ldap_source: No LDAP source configured')
return {}
att_ns = AttributeNamespace.objects.get(friendly_name='LDAP')
attribute_namespace = att_ns.identifier
attributes = {}
for source in sources:
logger.debug('ldap_source: The LDAP source is known as %s' \
% source.name)
'''
Grab the DN
'''
identifier = None
if isinstance(user, UserAlias):
'''
If the request is on a user alias with no django user,
attributes can be retrieved if the alias is an ldap dn
'''
identifier = user.alias
else:
from abac.core import get_identifier_in_source
identifier = get_identifier_in_source(user, source)
if not identifier:
logger.error('ldap_source: No user identifier known into that \
source')
else:
logger.debug('ldap_source: the user is known as %s in source %s' \
% (identifier, source.name))
try:
l = ldap.open(source.server)
l.protocol_version = ldap.VERSION3
username = source.user
password = source.password
l.simple_bind(username, password)
except ldap.LDAPError, err:
logger.error('ldap_source: an error occured at binding due \
to %s' % err)
else:
base_dn = source.base
search_scope = ldap.SCOPE_SUBTREE
retrieve_attributes = None
dn = ldap.dn.explode_dn(identifier,
flags=ldap.DN_FORMAT_LDAPV3)
search_filter = dn[0]
logger.debug('ldap_source: rdn is %s' % search_filter)
data = []
try:
ldap_result_id = l.search(base_dn, search_scope,
search_filter, retrieve_attributes)
result_type, result_data = l.result(ldap_result_id, 0)
logger.debug('ldap_source: result %s %s' % (result_type,
result_data))
for d, dic in result_data:
logger.debug('ldap_source: found %s' % d)
if d == identifier:
logger.debug('ldap_source: Attributes are %s' \
% dic)
for key in dic.keys():
attr = {}
attr['name'] = key
attr['values'] = [\
a.decode('utf-8'). \
encode('ascii', 'ignore') \
for a in dic[key]]
attr['namespace'] = attribute_namespace
data.append(attr)
except ldap.LDAPError, err:
logger.error('ldap_source: an error occured at searching \
due to %s' % err)
else:
if not data:
logger.error('ldap_source: no attribute found')
else:
attributes[source.name] = data
logger.debug('ldap_source: the attributes returned are %s' % attributes)
return attributes
attributes_call.connect(ldap_sources)

View File

View File

@ -1,296 +0,0 @@
'''
VERIDIC - Towards a centralized access control system
Copyright (C) 2011 Mikael Ates
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
'''
(Core - 3958) For the sake of improved interoperability, it is RECOMMENDED
that all time references be in UTC time.
(Core - 3960) For doubles, XACML SHALL use the conversions described in
[IEEE754]
'''
#RENAME me to XACML constants!
from django.utils.translation import ugettext as _
ACS_XACML_DATATYPE_STRING = "http://www.w3.org/2001/XMLSchema#string"
ACS_XACML_DATATYPE_BOOLEAN = "http://www.w3.org/2001/XMLSchema#boolean"
ACS_XACML_DATATYPE_INTEGER = "http://www.w3.org/2001/XMLSchema#integer"
ACS_XACML_DATATYPE_DOUBLE = "http://www.w3.org/2001/XMLSchema#double"
ACS_XACML_DATATYPE_TIME = "http://www.w3.org/2001/XMLSchema#time"
ACS_XACML_DATATYPE_DATE = "http://www.w3.org/2001/XMLSchema#date"
ACS_XACML_DATATYPE_DATETIME = "http://www.w3.org/2001/XMLSchema#dateTime"
#ACS_XACML_DATATYPE_ANYURI = "http://www.w3.org/2001/XMLSchema#anyURI"
#ACS_XACML_DATATYPE_HEXBINARY = "http://www.w3.org/2001/XMLSchema#hexBinary"
#ACS_XACML_DATATYPE_BASE64BINARY = \
# "http://www.w3.org/2001/XMLSchema#base64Binary"
#ACS_XACML_DATATYPE_DAYTIMEDURATION = \
# "http://www.w3.org/2001/XMLSchema#dayTimeDuration"
#ACS_XACML_DATATYPE_YEARMONTHDURATION = \
# "http://www.w3.org/2001/XMLSchema#yearMonthDuration"
'''
An ITU-T Rec.X.520 Distinguished Name.
The valid syntax for such a name is described in IETF RFC 2253
"Lightweight Directory Access Protocol (v3): UTF-8 String Representation of
Distinguished Names".
'''
ACS_XACML_DATATYPE_X500NAME = \
"urn:oasis:names:tc:xacml:1.0:data-type:x500Name"
'''
An electronic mail address. The valid syntax for such a name is described
in IETF RFC 2821, Section 4.1.2,
Command Argument Syntax, under the term "Mailbox".
'''
ACS_XACML_DATATYPE_RFC822NAME = \
"urn:oasis:names:tc:xacml:1.0:data-type:rfc822Name"
'''
The syntax SHALL be:
ipAddress = address [ "/" mask ] [ ":" [ portrange ] ]
For an IPv4 address, the address and mask are formatted in accordance with the syntax for a
"host" in IETF RFC 2396 "Uniform Resource Identifiers (URI): Generic Syntax", section 3.2.
For an IPv6 address, the address and mask are formatted in accordance with the syntax for an
"ipv6reference" in IETF RFC 2732 "Format for Literal IPv6 Addresses in URL's". (Note that an
IPv6 address or mask, in this syntax, is enclosed in literal "[" "]" brackets.)
'''
ACS_XACML_DATATYPE_IPADDRESS = \
"urn:oasis:names:tc:xacml:2.0:data-type:ipAddress"
'''
The syntax SHALL be:
dnsName = hostname [ ":" portrange ]
The hostname is formatted in accordance with IETF RFC 2396 "Uniform Resource Identifiers
(URI): Generic Syntax", section 3.2, except that a wildcard "*" may be used in the left-most
component of the hostname to indicate "any subdomain" under the domain specified to its right.
For both the "urn:oasis:names:tc:xacml:2.0:data-type:ipAddress" and
"urn:oasis:names:tc:xacml:2.0:data-type:dnsName" data-types, the port or port range syntax
SHALL be
portrange = portnumber | "-"portnumber | portnumber"-"[portnumber]
where "portnumber" is a decimal port number. If the port number is of the form "-x", where "x" is
a port number, then the range is all ports numbered "x" and below. If the port number is of the
form "x-", then the range is all ports numbered "x" and above. [This syntax is taken from the Java
SocketPermission.]
'''
ACS_XACML_DATATYPE_DNSNAME = \
"urn:oasis:names:tc:xacml:2.0:data-type:dnsName"
ACS_XACML_DATATYPE_XPATHEXPRESSION = \
"urn:oasis:names:tc:xacml:3.0:data-type:xpathExpression"
XACML_DATA_TYPE = (
(ACS_XACML_DATATYPE_STRING, _('String')),
(ACS_XACML_DATATYPE_BOOLEAN, _('Boolean')),
(ACS_XACML_DATATYPE_INTEGER, _('Integer')),
(ACS_XACML_DATATYPE_DOUBLE, _('Double')),
(ACS_XACML_DATATYPE_TIME, _('Time')),
(ACS_XACML_DATATYPE_DATE, _('Date')),
(ACS_XACML_DATATYPE_DATETIME, _('Date and Time')),
# (ACS_XACML_DATATYPE_ANYURI, _('Any URI')),
# (ACS_XACML_DATATYPE_HEXBINARY, _('Hex Binary')),
# (ACS_XACML_DATATYPE_BASE64BINARY, _('Base64 Binary')),
# (ACS_XACML_DATATYPE_DAYTIMEDURATION, _('Day Time Duration')),
# (ACS_XACML_DATATYPE_YEARMONTHDURATION, _('Year Month Duration')),
# (ACS_XACML_DATATYPE_X500NAME, _('X500 name')),
(ACS_XACML_DATATYPE_RFC822NAME, _('email address')),
(ACS_XACML_DATATYPE_IPADDRESS, _('IP address')),
# (ACS_XACML_DATATYPE_DNSNAME, _('DNS name')),
# (ACS_XACML_DATATYPE_XPATHEXPRESSION, _('XPATH expression')),
)
ACS_XACML_COMPARISON_EQUALITY_STRING = \
"urn:oasis:names:tc:xacml:1.0:function:string-equal"
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE = \
"urn:oasis:names:tc:xacml:3.0:function:string-equal-ignore-case"
ACS_XACML_COMPARISON_EQUALITY_BOOLEAN = \
"urn:oasis:names:tc:xacml:1.0:function:boolean-equal"
ACS_XACML_COMPARISON_EQUALITY_INTEGER = \
"urn:oasis:names:tc:xacml:1.0:function:integer-equal"
ACS_XACML_COMPARISON_EQUALITY_DOUBLE = \
"urn:oasis:names:tc:xacml:1.0:function:double-equal"
ACS_XACML_COMPARISON_EQUALITY_DATE = \
"urn:oasis:names:tc:xacml:1.0:function:date-equal"
ACS_XACML_COMPARISON_EQUALITY_TIME = \
"urn:oasis:names:tc:xacml:1.0:function:time-equal"
ACS_XACML_COMPARISON_EQUALITY_DATETIME = \
"urn:oasis:names:tc:xacml:1.0:function:dateTime-equal"
ACS_XACML_COMPARISON_EQUALITY_DAYTIMEDURATION = \
"urn:oasis:names:tc:xacml:3.0:function:dayTimeDuration-equal"
ACS_XACML_COMPARISON_EQUALITY_YEARMONTHDURATION = \
"urn:oasis:names:tc:xacml:3.0:function:yearMonthDuration-equal"
ACS_XACML_COMPARISON_EQUALITY_ANYURI = \
"urn:oasis:names:tc:xacml:1.0:function:anyURI-equal"
ACS_XACML_COMPARISON_EQUALITY_X500NAME = \
"urn:oasis:names:tc:xacml:1.0:function:x500Name-equal"
ACS_XACML_COMPARISON_EQUALITY_RFC822NAME = \
"urn:oasis:names:tc:xacml:1.0:function:rfc822Name-equal"
ACS_XACML_COMPARISON_EQUALITY_IPADDRESS = \
"urn:oasis:names:tc:xacml:1.0:function:ipAddress-equal"
ACS_XACML_COMPARISON_EQUALITY_HEXBINARY = \
"urn:oasis:names:tc:xacml:1.0:function:hexBinary-equal"
ACS_XACML_COMPARISON_EQUALITY_BASE64BINARY = \
"urn:oasis:names:tc:xacml:1.0:function:base64Binary-equal"
XACML_COMPARISON_EQUALITY = (
ACS_XACML_COMPARISON_EQUALITY_STRING,
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
ACS_XACML_COMPARISON_EQUALITY_BOOLEAN,
ACS_XACML_COMPARISON_EQUALITY_INTEGER,
ACS_XACML_COMPARISON_EQUALITY_DOUBLE,
ACS_XACML_COMPARISON_EQUALITY_DATE,
ACS_XACML_COMPARISON_EQUALITY_TIME,
ACS_XACML_COMPARISON_EQUALITY_DATETIME,
# ACS_XACML_COMPARISON_EQUALITY_DAYTIMEDURATION, _('Day Time Duration equality')),
# ACS_XACML_COMPARISON_EQUALITY_YEARMONTHDURATION, _('Year Month Duration equality')),
# ACS_XACML_COMPARISON_EQUALITY_ANYURI, _('Any URI equality')),
# ACS_XACML_COMPARISON_EQUALITY_X500NAME, _('X500 name equality')),
ACS_XACML_COMPARISON_EQUALITY_RFC822NAME,
ACS_XACML_COMPARISON_EQUALITY_IPADDRESS,
# ACS_XACML_COMPARISON_EQUALITY_HEXBINARY, _('Hex binary equality')),
# ACS_XACML_COMPARISON_EQUALITY_BASE64BINARY, _('base 64 binary equality')),
)
XACML_COMPARISON_EQUALITY_TYPE = (
(ACS_XACML_COMPARISON_EQUALITY_STRING, _('String equality')),
(ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE, _('String equality ignoring case')),
(ACS_XACML_COMPARISON_EQUALITY_BOOLEAN, _('Boolean equality')),
(ACS_XACML_COMPARISON_EQUALITY_INTEGER, _('Integer equality')),
(ACS_XACML_COMPARISON_EQUALITY_DOUBLE, _('Double equality')),
(ACS_XACML_COMPARISON_EQUALITY_DATE, _('Date equality')),
(ACS_XACML_COMPARISON_EQUALITY_TIME, _('Time equality')),
(ACS_XACML_COMPARISON_EQUALITY_DATETIME, _('DateTime equality')),
# (ACS_XACML_COMPARISON_EQUALITY_DAYTIMEDURATION, _('Day Time Duration equality')),
# (ACS_XACML_COMPARISON_EQUALITY_YEARMONTHDURATION, _('Year Month Duration equality')),
# (ACS_XACML_COMPARISON_EQUALITY_ANYURI, _('Any URI equality')),
# (ACS_XACML_COMPARISON_EQUALITY_X500NAME, _('X500 name equality')),
(ACS_XACML_COMPARISON_EQUALITY_RFC822NAME, _('email equality')),
(ACS_XACML_COMPARISON_EQUALITY_IPADDRESS, _('IP address equality')),
# (ACS_XACML_COMPARISON_EQUALITY_HEXBINARY, _('Hex binary equality')),
# (ACS_XACML_COMPARISON_EQUALITY_BASE64BINARY, _('base 64 binary equality')),
)
XACML_COMPARISON_EQUALITY_TYPE_DIC = {
ACS_XACML_COMPARISON_EQUALITY_STRING: _('String equality'),
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE: _('String equality ignoring case'),
ACS_XACML_COMPARISON_EQUALITY_BOOLEAN: _('Boolean equality'),
ACS_XACML_COMPARISON_EQUALITY_INTEGER: _('Integer equality'),
ACS_XACML_COMPARISON_EQUALITY_DOUBLE: _('Double equality'),
ACS_XACML_COMPARISON_EQUALITY_DATE: _('Date equality'),
ACS_XACML_COMPARISON_EQUALITY_TIME: _('Time equality'),
ACS_XACML_COMPARISON_EQUALITY_DATETIME: _('DateTime equality'),
# ACS_XACML_COMPARISON_EQUALITY_DAYTIMEDURATION: _('Day Time Duration equality'),
# ACS_XACML_COMPARISON_EQUALITY_YEARMONTHDURATION: _('Year Month Duration equality'),
# ACS_XACML_COMPARISON_EQUALITY_ANYURI: _('Any URI equality'),
# ACS_XACML_COMPARISON_EQUALITY_X500NAME: _('X500 name equality'),
ACS_XACML_COMPARISON_EQUALITY_RFC822NAME: _('email equality'),
ACS_XACML_COMPARISON_EQUALITY_IPADDRESS: _('IP address equality'),
# ACS_XACML_COMPARISON_EQUALITY_HEXBINARY: _('Hex binary equality'),
# ACS_XACML_COMPARISON_EQUALITY_BASE64BINARY: _('base 64 binary equality'),
}
ACS_XACML_COMPARISON_INTEGER_GRT = \
"urn:oasis:names:tc:xacml:1.0:function:integer-greater-than"
ACS_XACML_COMPARISON_INTEGER_GRT_OE = \
"urn:oasis:names:tc:xacml:1.0:function:integer-greater-than-or-equal"
ACS_XACML_COMPARISON_INTEGER_LT = \
"urn:oasis:names:tc:xacml:1.0:function:integer-less-than"
ACS_XACML_COMPARISON_INTEGER_LT_OE = \
"urn:oasis:names:tc:xacml:1.0:function:integer-less-than-or-equal"
ACS_XACML_COMPARISON_DOUBLE_GRT = \
"urn:oasis:names:tc:xacml:1.0:function:double-greater-than"
ACS_XACML_COMPARISON_DOUBLE_GRT_OE = \
"urn:oasis:names:tc:xacml:1.0:function:double-greater-than-or-equal"
ACS_XACML_COMPARISON_DOUBLE_LT = \
"urn:oasis:names:tc:xacml:1.0:function:double-less-than"
ACS_XACML_COMPARISON_DOUBLE_LT_OE = \
"urn:oasis:names:tc:xacml:1.0:function:double-less-than-or-equal"
ACS_XACML_COMPARISON_GRT = (
ACS_XACML_COMPARISON_INTEGER_GRT,
ACS_XACML_COMPARISON_DOUBLE_GRT,
)
ACS_XACML_COMPARISON_GRT_OE = (
ACS_XACML_COMPARISON_INTEGER_GRT_OE,
ACS_XACML_COMPARISON_DOUBLE_GRT_OE,
)
ACS_XACML_COMPARISON_LT = (
ACS_XACML_COMPARISON_INTEGER_LT,
ACS_XACML_COMPARISON_DOUBLE_LT,
)
ACS_XACML_COMPARISON_LT_OE = (
ACS_XACML_COMPARISON_INTEGER_LT_OE,
ACS_XACML_COMPARISON_DOUBLE_LT_OE,
)
ACS_XACML_COMPARISON = ACS_XACML_COMPARISON_GRT \
+ ACS_XACML_COMPARISON_GRT_OE \
+ ACS_XACML_COMPARISON_LT \
+ ACS_XACML_COMPARISON_LT_OE
XACML_COMPARISON_DIFF_TYPE = (
(ACS_XACML_COMPARISON_INTEGER_GRT, _('Integer 1 greater than integer 2')),
(ACS_XACML_COMPARISON_INTEGER_GRT_OE, _('Integer 1 greater than or equal integer 2')),
(ACS_XACML_COMPARISON_INTEGER_LT, _('Integer 1 less than integer 2')),
(ACS_XACML_COMPARISON_INTEGER_LT_OE, _('Integer 1 less than or equal integer 2')),
(ACS_XACML_COMPARISON_DOUBLE_GRT, _('Double 1 greater than double 2')),
(ACS_XACML_COMPARISON_DOUBLE_GRT_OE, _('Double 1 greater than or equal double 2')),
(ACS_XACML_COMPARISON_DOUBLE_LT, _('Double 1 less than double 2')),
(ACS_XACML_COMPARISON_DOUBLE_LT_OE, _('Double 1 less than or equal double 2')),
)
XACML_COMPARISON_DIFF_TYPE_DIC = {
ACS_XACML_COMPARISON_INTEGER_GRT: _('Integer 1 greater than integer 2'),
ACS_XACML_COMPARISON_INTEGER_GRT_OE: _('Integer 1 greater than or equal integer 2'),
ACS_XACML_COMPARISON_INTEGER_LT: _('Integer 1 less than integer 2'),
ACS_XACML_COMPARISON_INTEGER_LT_OE: _('Integer 1 less than or equal integer 2'),
ACS_XACML_COMPARISON_DOUBLE_GRT: _('Double 1 greater than double 2'),
ACS_XACML_COMPARISON_DOUBLE_GRT_OE: _('Double 1 greater than or equal double 2'),
ACS_XACML_COMPARISON_DOUBLE_LT: _('Double 1 less than double 2'),
ACS_XACML_COMPARISON_DOUBLE_LT_OE: _('Double 1 less than or equal double 2'),
}
XACML_COMPARISON_TYPE_DIC = dict(XACML_COMPARISON_EQUALITY_TYPE_DIC.items() \
+ XACML_COMPARISON_DIFF_TYPE_DIC.items())
XACML_COMPARISON_TYPE = XACML_COMPARISON_EQUALITY_TYPE + XACML_COMPARISON_DIFF_TYPE
ACS_COMP_TYPE = {
ACS_XACML_COMPARISON_INTEGER_GRT: ACS_XACML_DATATYPE_INTEGER,
ACS_XACML_COMPARISON_INTEGER_GRT_OE: ACS_XACML_DATATYPE_INTEGER,
ACS_XACML_COMPARISON_INTEGER_LT: ACS_XACML_DATATYPE_INTEGER,
ACS_XACML_COMPARISON_INTEGER_LT_OE: ACS_XACML_DATATYPE_INTEGER,
ACS_XACML_COMPARISON_DOUBLE_GRT: ACS_XACML_DATATYPE_DOUBLE,
ACS_XACML_COMPARISON_DOUBLE_GRT_OE: ACS_XACML_DATATYPE_DOUBLE,
ACS_XACML_COMPARISON_DOUBLE_LT: ACS_XACML_DATATYPE_DOUBLE,
ACS_XACML_COMPARISON_DOUBLE_LT_OE: ACS_XACML_DATATYPE_DOUBLE,
ACS_XACML_COMPARISON_EQUALITY_STRING: ACS_XACML_DATATYPE_STRING,
ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE: ACS_XACML_DATATYPE_STRING,
ACS_XACML_COMPARISON_EQUALITY_BOOLEAN: ACS_XACML_DATATYPE_BOOLEAN,
ACS_XACML_COMPARISON_EQUALITY_INTEGER: ACS_XACML_DATATYPE_INTEGER,
ACS_XACML_COMPARISON_EQUALITY_DOUBLE: ACS_XACML_DATATYPE_DOUBLE,
ACS_XACML_COMPARISON_EQUALITY_DATE: ACS_XACML_DATATYPE_DATE,
ACS_XACML_COMPARISON_EQUALITY_TIME: ACS_XACML_DATATYPE_TIME,
ACS_XACML_COMPARISON_EQUALITY_DATETIME: ACS_XACML_DATATYPE_DATETIME,
# ACS_XACML_COMPARISON_EQUALITY_DAYTIMEDURATION: ACS_XACML_DATATYPE_DAYTIMEDURATION,
# ACS_XACML_COMPARISON_EQUALITY_YEARMONTHDURATION: ACS_XACML_DATATYPE_YEARMONTHDURATION,
# ACS_XACML_COMPARISON_EQUALITY_ANYURI: ACS_XACML_DATATYPE_ANYURI,
# ACS_XACML_COMPARISON_EQUALITY_X500NAME: ACS_XACML_DATATYPE_X500NAME,
ACS_XACML_COMPARISON_EQUALITY_RFC822NAME: ACS_XACML_DATATYPE_RFC822NAME,
ACS_XACML_COMPARISON_EQUALITY_IPADDRESS: ACS_XACML_DATATYPE_IPADDRESS,
# ACS_XACML_COMPARISON_EQUALITY_HEXBINARY: ACS_XACML_DATATYPE_HEXBINARY,
# ACS_XACML_COMPARISON_EQUALITY_BASE64BINARY: ACS_XACML_DATATYPE_BASE64BINARY,
}