[Abac] Use a unique persistent object to store a rule.
This commit is contained in:
parent
97252a3610
commit
6b3b67b243
|
@ -25,69 +25,58 @@ from cPickle import loads, dumps
|
|||
|
||||
from django.db import models
|
||||
|
||||
from attribute_aggregator.xacml_constants import *
|
||||
from attribute_aggregator.models import AttributeSource
|
||||
from acs.attribute_aggregator.xacml_constants import *
|
||||
from acs.attribute_aggregator.models import AttributeSource
|
||||
|
||||
|
||||
class AssertionAny(models.Model):
|
||||
class AssertionData:
|
||||
def __init__(self, attribute_data=None):
|
||||
self.attribute_data = attribute_data
|
||||
|
||||
def get_assertion_instance(self):
|
||||
try:
|
||||
return self.assertiondefinition
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
return self.assertiondata
|
||||
except:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
class AssertionData(AssertionAny):
|
||||
attribute_data = models.TextField(null=True, blank=True)
|
||||
# def __init__(self, attribute_data=None, *args, **kwargs):
|
||||
# super(AssertionData, self).__init__(*args, **kwargs)
|
||||
# if attribute_data:
|
||||
# self.attribute_data = dumps(attribute_data)
|
||||
def get_attribute_data(self):
|
||||
return loads(str(self.attribute_data))
|
||||
return self.attribute_data
|
||||
|
||||
def set_attribute_data(self, attribute_data):
|
||||
if attribute_data:
|
||||
self.attribute_data = dumps(attribute_data)
|
||||
self.save()
|
||||
return self.get_attribute_data()
|
||||
return None
|
||||
self.attribute_data = attribute_data
|
||||
|
||||
def __unicode__(self):
|
||||
data = self.get_attribute_data()
|
||||
return data.__unicode__()
|
||||
|
||||
|
||||
class AssertionDefinition(AssertionAny):
|
||||
definition = models.CharField(max_length=200)
|
||||
sources = models.ManyToManyField(AttributeSource, null=True, blank=True)
|
||||
class AssertionDefinition:
|
||||
def __init__(self, definition=None, sources=[]):
|
||||
self.definition = definition
|
||||
self.sources = []
|
||||
for source in sources:
|
||||
self.add_source(source)
|
||||
|
||||
def add_source(self, source):
|
||||
if not source in self.sources.all():
|
||||
self.sources.add(source)
|
||||
self.save()
|
||||
return 0
|
||||
if not source.id in self.sources:
|
||||
self.sources.append(source.id)
|
||||
|
||||
def get_definition(self):
|
||||
return self.definition
|
||||
|
||||
def remove_source(self, source):
|
||||
if source in self.sources.all():
|
||||
self.sources.delete(source)
|
||||
self.save()
|
||||
return 0
|
||||
if source.id in self.sources:
|
||||
self.sources.pop(source.id)
|
||||
|
||||
def get_sources(self):
|
||||
result = []
|
||||
for key in self.sources:
|
||||
try:
|
||||
source_ = AttributeSource.objects.get(id=key)
|
||||
result.append(source_)
|
||||
except:
|
||||
pass
|
||||
return result
|
||||
|
||||
def __unicode__(self):
|
||||
s = self.definition
|
||||
if self.sources:
|
||||
s += ' from '
|
||||
for source in self.sources.all():
|
||||
for source in self.get_sources():
|
||||
s = s + source.name + ', '
|
||||
if len(s) > 6:
|
||||
s = s[:-2]
|
||||
|
@ -96,89 +85,92 @@ class AssertionDefinition(AssertionAny):
|
|||
return s
|
||||
|
||||
|
||||
'''
|
||||
An ABAC rule is a string containing logical statements (and, or, not) and
|
||||
the identifiers of predicates.
|
||||
|
||||
To make feasible to use the the model instance identifiers of predicates,
|
||||
all predicates objects have a unique identifiers with a model defined per
|
||||
predicate type, a parent model is used: Predicate.
|
||||
|
||||
All predicate types (e.g. PredicateRequired) herit from Predicate.
|
||||
|
||||
The identifiers used are thus the identifiers of the instances of the
|
||||
model Predicate
|
||||
'''
|
||||
|
||||
|
||||
class Predicate(models.Model):
|
||||
rule = models.ForeignKey('AbacRule')
|
||||
|
||||
def get_predicate_instance(self):
|
||||
try:
|
||||
return self.predicaterole
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
return self.predicaterequired
|
||||
except:
|
||||
pass
|
||||
try:
|
||||
return self.predicatecomparison
|
||||
except:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
'''
|
||||
To say that an attribute is required
|
||||
'''
|
||||
|
||||
|
||||
class PredicateRequired(Predicate):
|
||||
assertion_definition = models.ForeignKey(AssertionDefinition)
|
||||
single_value = models.BooleanField(default=False)
|
||||
class PredicateRequired:
|
||||
def __init__(self, assertion_definition=None, single_value=False, sources=[]):
|
||||
self.assertion_definition = assertion_definition
|
||||
self.single_value = single_value
|
||||
self.sources = []
|
||||
for source in sources:
|
||||
self.add_source(source)
|
||||
|
||||
def get_assertion_definition(self):
|
||||
return self.assertion_definition
|
||||
|
||||
def set_assertion_definition(self, assertion_definition=None):
|
||||
self.assertion_definition = assertion_definition
|
||||
|
||||
def get_definition(self):
|
||||
return self.assertion_definition.definition
|
||||
if self.assertion_definition:
|
||||
return self.assertion_definition.definition
|
||||
return None
|
||||
|
||||
def get_sources(self):
|
||||
return self.assertion_definition.sources.all()
|
||||
definition = self.assertion_definition
|
||||
if definition:
|
||||
return definition.get_sources()
|
||||
return None
|
||||
|
||||
def __unicode__(self):
|
||||
if self.single_value:
|
||||
return "Predicate required: %s - \
|
||||
The attribute must be single-valued" \
|
||||
% str(self.assertion_definition)
|
||||
return "Predicate required: %s" % str(self.assertion_definition)
|
||||
% str(self.get_definition())
|
||||
return "Predicate required: %s" % str(self.get_definition())
|
||||
|
||||
|
||||
class PredicateRole(Predicate):
|
||||
class PredicateRole:
|
||||
'''
|
||||
Role are only handled from the ACS role tree
|
||||
|
||||
Else, use attributes equality with roles provided as attributes from
|
||||
sources.
|
||||
'''
|
||||
role = models.ForeignKey('acs.Role')
|
||||
def __init__(self, role=None):
|
||||
self.role = None
|
||||
if role and role.id:
|
||||
self.role = role.id
|
||||
|
||||
def get_role(self):
|
||||
try:
|
||||
from acs.models import Role
|
||||
return Role.objects.get(id=self.role)
|
||||
except:
|
||||
return None
|
||||
|
||||
def __unicode__(self):
|
||||
return "Predicate role on %s" % str(self.role)
|
||||
return "Predicate role on %s" % str(self.get_role())
|
||||
|
||||
|
||||
class PredicateComparison(Predicate):
|
||||
operand1 = models.ForeignKey(AssertionAny, related_name = 'operand1')
|
||||
operand2 = models.ForeignKey(AssertionAny, related_name = 'operand2')
|
||||
operand1_single_value = models.BooleanField(default=False)
|
||||
operand2_single_value = models.BooleanField(default=False)
|
||||
comparison_type = models.CharField(max_length = 100,
|
||||
choices = XACML_COMPARISON_TYPE,
|
||||
verbose_name = 'type of comparison',
|
||||
default = ACS_XACML_COMPARISON_EQUALITY_STRING)
|
||||
multivalues = models.CharField(max_length = 100,
|
||||
verbose_name = 'How to handle multivalued attributes',
|
||||
default = 'NO_MULTIVALUES')
|
||||
multivalues_explanation = models.CharField(max_length = 500, blank=True)
|
||||
class PredicateComparison:
|
||||
def __init__(self, operand1=None, operand2=None,
|
||||
operand1_single_value=False, operand2_single_value=False,
|
||||
comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING,
|
||||
multivalues='NO_MULTIVALUES',
|
||||
multivalues_explanation=None):
|
||||
self.operand1 = operand1
|
||||
self.operand2 = operand2
|
||||
self.operand1_single_value = operand1_single_value
|
||||
self.operand2_single_value = operand2_single_value
|
||||
self.comparison_type = comparison_type
|
||||
self.multivalues = multivalues
|
||||
self.multivalues_explanation = multivalues_explanation
|
||||
|
||||
def get_operand1(self):
|
||||
return self.operand1
|
||||
|
||||
def set_operand1(self, operand1=None):
|
||||
self.operand1 = operand1
|
||||
|
||||
def get_operand2(self):
|
||||
return self.operand2
|
||||
|
||||
def set_operand2(self, operand2=None):
|
||||
self.operand2 = operand2
|
||||
|
||||
def __unicode__(self):
|
||||
operator = ''
|
||||
|
@ -193,8 +185,8 @@ class PredicateComparison(Predicate):
|
|||
elif self.comparison_type in ACS_XACML_COMPARISON_GRT_OE:
|
||||
operator = '>='
|
||||
s = 'Predicate comparison: %s %s %s (' \
|
||||
% (str(self.operand1.get_assertion_instance()),
|
||||
operator, str(self.operand2.get_assertion_instance()))
|
||||
% (self.get_operand1().__unicode__(),
|
||||
operator, self.get_operand2().__unicode__())
|
||||
if self.operand1_single_value:
|
||||
s += 'operand one requires a single-valued attribute - '
|
||||
if self.operand2_single_value:
|
||||
|
@ -211,36 +203,30 @@ class PredicateComparison(Predicate):
|
|||
|
||||
|
||||
'''
|
||||
Predicate are str(id) of attribute definitions
|
||||
An ABAC rule is a string containing logical statements (and, or, not) and
|
||||
the identifiers of predicates.
|
||||
'''
|
||||
|
||||
|
||||
class AbacRule(models.Model):
|
||||
expression = models.CharField(max_length = 2048)
|
||||
expression = models.CharField(max_length = 2048, null=True, blank=True)
|
||||
predicates = models.CharField(max_length = 4096, null=True, blank=True)
|
||||
|
||||
def get_predicates(self):
|
||||
if not self.predicates:
|
||||
return []
|
||||
return loads(str(self.predicates))
|
||||
|
||||
def add_predicate(self, predicate=None):
|
||||
if predicate:
|
||||
predicates = self.get_predicates()
|
||||
predicates.append(predicate)
|
||||
self.predicates = dumps(predicates)
|
||||
|
||||
def __unicode__(self):
|
||||
predicates1 = {}
|
||||
predicates2 = {}
|
||||
count = 1
|
||||
expression = self.expression
|
||||
'''
|
||||
Here we substitute primary keys with the predicate description.
|
||||
|
||||
The substitution might fail if some descriptions contains
|
||||
primary keys since with an iterative substitution, a
|
||||
primary key resulting from a previsous substitution would be
|
||||
replaced by another description.
|
||||
|
||||
To prevent this we realize a two-round substitution with unique
|
||||
identifiers.
|
||||
'''
|
||||
for p in Predicate.objects.filter(rule=self):
|
||||
rdm_str = ''.join(\
|
||||
random.choice(string.ascii_uppercase) for x in range(8))
|
||||
predicates1[p.id] = rdm_str
|
||||
predicates2[rdm_str] = str(p.get_predicate_instance())
|
||||
for key in predicates1.keys():
|
||||
expression = re.sub(str(key), str(predicates1[key]),
|
||||
expression)
|
||||
for key in predicates2.keys():
|
||||
expression = re.sub(str(key), str(predicates2[key]), expression)
|
||||
for predicate in self.get_predicates():
|
||||
expression = re.sub(str(count), predicate.__unicode__(), expression)
|
||||
count = count + 1
|
||||
return expression
|
||||
|
|
Reference in New Issue