[Abac] Use a unique persistent object to store a rule.

This commit is contained in:
Mikaël Ates 2012-03-21 17:26:13 +01:00
parent 97252a3610
commit 6b3b67b243
1 changed files with 109 additions and 123 deletions

View File

@ -25,69 +25,58 @@ from cPickle import loads, dumps
from django.db import models
from attribute_aggregator.xacml_constants import *
from attribute_aggregator.models import AttributeSource
from acs.attribute_aggregator.xacml_constants import *
from acs.attribute_aggregator.models import AttributeSource
class AssertionAny(models.Model):
class AssertionData:
def __init__(self, attribute_data=None):
self.attribute_data = attribute_data
def get_assertion_instance(self):
try:
return self.assertiondefinition
except:
pass
try:
return self.assertiondata
except:
pass
return None
class AssertionData(AssertionAny):
attribute_data = models.TextField(null=True, blank=True)
# def __init__(self, attribute_data=None, *args, **kwargs):
# super(AssertionData, self).__init__(*args, **kwargs)
# if attribute_data:
# self.attribute_data = dumps(attribute_data)
def get_attribute_data(self):
return loads(str(self.attribute_data))
return self.attribute_data
def set_attribute_data(self, attribute_data):
if attribute_data:
self.attribute_data = dumps(attribute_data)
self.save()
return self.get_attribute_data()
return None
self.attribute_data = attribute_data
def __unicode__(self):
data = self.get_attribute_data()
return data.__unicode__()
class AssertionDefinition(AssertionAny):
definition = models.CharField(max_length=200)
sources = models.ManyToManyField(AttributeSource, null=True, blank=True)
class AssertionDefinition:
def __init__(self, definition=None, sources=[]):
self.definition = definition
self.sources = []
for source in sources:
self.add_source(source)
def add_source(self, source):
if not source in self.sources.all():
self.sources.add(source)
self.save()
return 0
if not source.id in self.sources:
self.sources.append(source.id)
def get_definition(self):
return self.definition
def remove_source(self, source):
if source in self.sources.all():
self.sources.delete(source)
self.save()
return 0
if source.id in self.sources:
self.sources.pop(source.id)
def get_sources(self):
result = []
for key in self.sources:
try:
source_ = AttributeSource.objects.get(id=key)
result.append(source_)
except:
pass
return result
def __unicode__(self):
s = self.definition
if self.sources:
s += ' from '
for source in self.sources.all():
for source in self.get_sources():
s = s + source.name + ', '
if len(s) > 6:
s = s[:-2]
@ -96,89 +85,92 @@ class AssertionDefinition(AssertionAny):
return s
'''
An ABAC rule is a string containing logical statements (and, or, not) and
the identifiers of predicates.
To make feasible to use the the model instance identifiers of predicates,
all predicates objects have a unique identifiers with a model defined per
predicate type, a parent model is used: Predicate.
All predicate types (e.g. PredicateRequired) herit from Predicate.
The identifiers used are thus the identifiers of the instances of the
model Predicate
'''
class Predicate(models.Model):
rule = models.ForeignKey('AbacRule')
def get_predicate_instance(self):
try:
return self.predicaterole
except:
pass
try:
return self.predicaterequired
except:
pass
try:
return self.predicatecomparison
except:
pass
return None
'''
To say that an attribute is required
'''
class PredicateRequired(Predicate):
assertion_definition = models.ForeignKey(AssertionDefinition)
single_value = models.BooleanField(default=False)
class PredicateRequired:
def __init__(self, assertion_definition=None, single_value=False, sources=[]):
self.assertion_definition = assertion_definition
self.single_value = single_value
self.sources = []
for source in sources:
self.add_source(source)
def get_assertion_definition(self):
return self.assertion_definition
def set_assertion_definition(self, assertion_definition=None):
self.assertion_definition = assertion_definition
def get_definition(self):
return self.assertion_definition.definition
if self.assertion_definition:
return self.assertion_definition.definition
return None
def get_sources(self):
return self.assertion_definition.sources.all()
definition = self.assertion_definition
if definition:
return definition.get_sources()
return None
def __unicode__(self):
if self.single_value:
return "Predicate required: %s - \
The attribute must be single-valued" \
% str(self.assertion_definition)
return "Predicate required: %s" % str(self.assertion_definition)
% str(self.get_definition())
return "Predicate required: %s" % str(self.get_definition())
class PredicateRole(Predicate):
class PredicateRole:
'''
Role are only handled from the ACS role tree
Else, use attributes equality with roles provided as attributes from
sources.
'''
role = models.ForeignKey('acs.Role')
def __init__(self, role=None):
self.role = None
if role and role.id:
self.role = role.id
def get_role(self):
try:
from acs.models import Role
return Role.objects.get(id=self.role)
except:
return None
def __unicode__(self):
return "Predicate role on %s" % str(self.role)
return "Predicate role on %s" % str(self.get_role())
class PredicateComparison(Predicate):
operand1 = models.ForeignKey(AssertionAny, related_name = 'operand1')
operand2 = models.ForeignKey(AssertionAny, related_name = 'operand2')
operand1_single_value = models.BooleanField(default=False)
operand2_single_value = models.BooleanField(default=False)
comparison_type = models.CharField(max_length = 100,
choices = XACML_COMPARISON_TYPE,
verbose_name = 'type of comparison',
default = ACS_XACML_COMPARISON_EQUALITY_STRING)
multivalues = models.CharField(max_length = 100,
verbose_name = 'How to handle multivalued attributes',
default = 'NO_MULTIVALUES')
multivalues_explanation = models.CharField(max_length = 500, blank=True)
class PredicateComparison:
def __init__(self, operand1=None, operand2=None,
operand1_single_value=False, operand2_single_value=False,
comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING,
multivalues='NO_MULTIVALUES',
multivalues_explanation=None):
self.operand1 = operand1
self.operand2 = operand2
self.operand1_single_value = operand1_single_value
self.operand2_single_value = operand2_single_value
self.comparison_type = comparison_type
self.multivalues = multivalues
self.multivalues_explanation = multivalues_explanation
def get_operand1(self):
return self.operand1
def set_operand1(self, operand1=None):
self.operand1 = operand1
def get_operand2(self):
return self.operand2
def set_operand2(self, operand2=None):
self.operand2 = operand2
def __unicode__(self):
operator = ''
@ -193,8 +185,8 @@ class PredicateComparison(Predicate):
elif self.comparison_type in ACS_XACML_COMPARISON_GRT_OE:
operator = '>='
s = 'Predicate comparison: %s %s %s (' \
% (str(self.operand1.get_assertion_instance()),
operator, str(self.operand2.get_assertion_instance()))
% (self.get_operand1().__unicode__(),
operator, self.get_operand2().__unicode__())
if self.operand1_single_value:
s += 'operand one requires a single-valued attribute - '
if self.operand2_single_value:
@ -211,36 +203,30 @@ class PredicateComparison(Predicate):
'''
Predicate are str(id) of attribute definitions
An ABAC rule is a string containing logical statements (and, or, not) and
the identifiers of predicates.
'''
class AbacRule(models.Model):
expression = models.CharField(max_length = 2048)
expression = models.CharField(max_length = 2048, null=True, blank=True)
predicates = models.CharField(max_length = 4096, null=True, blank=True)
def get_predicates(self):
if not self.predicates:
return []
return loads(str(self.predicates))
def add_predicate(self, predicate=None):
if predicate:
predicates = self.get_predicates()
predicates.append(predicate)
self.predicates = dumps(predicates)
def __unicode__(self):
predicates1 = {}
predicates2 = {}
count = 1
expression = self.expression
'''
Here we substitute primary keys with the predicate description.
The substitution might fail if some descriptions contains
primary keys since with an iterative substitution, a
primary key resulting from a previsous substitution would be
replaced by another description.
To prevent this we realize a two-round substitution with unique
identifiers.
'''
for p in Predicate.objects.filter(rule=self):
rdm_str = ''.join(\
random.choice(string.ascii_uppercase) for x in range(8))
predicates1[p.id] = rdm_str
predicates2[rdm_str] = str(p.get_predicate_instance())
for key in predicates1.keys():
expression = re.sub(str(key), str(predicates1[key]),
expression)
for key in predicates2.keys():
expression = re.sub(str(key), str(predicates2[key]), expression)
for predicate in self.get_predicates():
expression = re.sub(str(count), predicate.__unicode__(), expression)
count = count + 1
return expression