247 lines
7.8 KiB
Python
247 lines
7.8 KiB
Python
'''
|
|
VERIDIC - Towards a centralized access control system
|
|
|
|
Copyright (C) 2011 Mikael Ates
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as
|
|
published by the Free Software Foundation, either version 3 of the
|
|
License, or (at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
'''
|
|
|
|
import re
|
|
import random
|
|
import string
|
|
|
|
from cPickle import loads, dumps
|
|
|
|
from django.db import models
|
|
|
|
from attribute_aggregator.xacml_constants import *
|
|
from attribute_aggregator.models import AttributeSource
|
|
|
|
|
|
class AssertionAny(models.Model):
|
|
|
|
def get_assertion_instance(self):
|
|
try:
|
|
return self.assertiondefinition
|
|
except:
|
|
pass
|
|
try:
|
|
return self.assertiondata
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
class AssertionData(AssertionAny):
|
|
attribute_data = models.TextField(null=True, blank=True)
|
|
# def __init__(self, attribute_data=None, *args, **kwargs):
|
|
# super(AssertionData, self).__init__(*args, **kwargs)
|
|
# if attribute_data:
|
|
# self.attribute_data = dumps(attribute_data)
|
|
def get_attribute_data(self):
|
|
return loads(str(self.attribute_data))
|
|
|
|
def set_attribute_data(self, attribute_data):
|
|
if attribute_data:
|
|
self.attribute_data = dumps(attribute_data)
|
|
self.save()
|
|
return self.get_attribute_data()
|
|
return None
|
|
|
|
def __unicode__(self):
|
|
data = self.get_attribute_data()
|
|
return data.__unicode__()
|
|
|
|
|
|
class AssertionDefinition(AssertionAny):
|
|
definition = models.CharField(max_length=200)
|
|
sources = models.ManyToManyField(AttributeSource, null=True, blank=True)
|
|
|
|
def add_source(self, source):
|
|
if not source in self.sources.all():
|
|
self.sources.add(source)
|
|
self.save()
|
|
return 0
|
|
|
|
def get_definition(self):
|
|
return self.definition
|
|
|
|
def remove_source(self, source):
|
|
if source in self.sources.all():
|
|
self.sources.delete(source)
|
|
self.save()
|
|
return 0
|
|
|
|
def __unicode__(self):
|
|
s = self.definition
|
|
if self.sources:
|
|
s += ' from '
|
|
for source in self.sources.all():
|
|
s = s + source.name + ', '
|
|
if len(s) > 6:
|
|
s = s[:-2]
|
|
else:
|
|
s = self.definition
|
|
return s
|
|
|
|
|
|
'''
|
|
An ABAC rule is a string containing logical statements (and, or, not) and
|
|
the identifiers of predicates.
|
|
|
|
To make feasible to use the the model instance identifiers of predicates,
|
|
all predicates objects have a unique identifiers with a model defined per
|
|
predicate type, a parent model is used: Predicate.
|
|
|
|
All predicate types (e.g. PredicateRequired) herit from Predicate.
|
|
|
|
The identifiers used are thus the identifiers of the instances of the
|
|
model Predicate
|
|
'''
|
|
|
|
|
|
class Predicate(models.Model):
|
|
rule = models.ForeignKey('AbacRule')
|
|
|
|
def get_predicate_instance(self):
|
|
try:
|
|
return self.predicaterole
|
|
except:
|
|
pass
|
|
try:
|
|
return self.predicaterequired
|
|
except:
|
|
pass
|
|
try:
|
|
return self.predicatecomparison
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
'''
|
|
To say that an attribute is required
|
|
'''
|
|
|
|
|
|
class PredicateRequired(Predicate):
|
|
assertion_definition = models.ForeignKey(AssertionDefinition)
|
|
single_value = models.BooleanField(default=False)
|
|
|
|
def get_definition(self):
|
|
return self.assertion_definition.definition
|
|
|
|
def get_sources(self):
|
|
return self.assertion_definition.sources.all()
|
|
|
|
def __unicode__(self):
|
|
if self.single_value:
|
|
return "Predicate required: %s - \
|
|
The attribute must be single-valued" \
|
|
% str(self.assertion_definition)
|
|
return "Predicate required: %s" % str(self.assertion_definition)
|
|
|
|
|
|
class PredicateRole(Predicate):
|
|
'''
|
|
Role are only handled from the ACS role tree
|
|
|
|
Else, use attributes equality with roles provided as attributes from
|
|
sources.
|
|
'''
|
|
role = models.ForeignKey('acs.Role')
|
|
|
|
def __unicode__(self):
|
|
return "Predicate role on %s" % str(self.role)
|
|
|
|
|
|
class PredicateComparison(Predicate):
|
|
operand1 = models.ForeignKey(AssertionAny, related_name = 'operand1')
|
|
operand2 = models.ForeignKey(AssertionAny, related_name = 'operand2')
|
|
operand1_single_value = models.BooleanField(default=False)
|
|
operand2_single_value = models.BooleanField(default=False)
|
|
comparison_type = models.CharField(max_length = 100,
|
|
choices = XACML_COMPARISON_TYPE,
|
|
verbose_name = 'type of comparison',
|
|
default = ACS_XACML_COMPARISON_EQUALITY_STRING)
|
|
multivalues = models.CharField(max_length = 100,
|
|
verbose_name = 'How to handle multivalued attributes',
|
|
default = 'NO_MULTIVALUES')
|
|
multivalues_explanation = models.CharField(max_length = 500, blank=True)
|
|
|
|
def __unicode__(self):
|
|
operator = ''
|
|
if self.comparison_type in XACML_COMPARISON_EQUALITY:
|
|
operator = '='
|
|
elif self.comparison_type in ACS_XACML_COMPARISON_LT:
|
|
operator = '<'
|
|
elif self.comparison_type in ACS_XACML_COMPARISON_LT_OE:
|
|
operator = '<='
|
|
elif self.comparison_type in ACS_XACML_COMPARISON_GRT:
|
|
operator = '>'
|
|
elif self.comparison_type in ACS_XACML_COMPARISON_GRT_OE:
|
|
operator = '>='
|
|
s = 'Predicate comparison: %s %s %s (' \
|
|
% (str(self.operand1.get_assertion_instance()),
|
|
operator, str(self.operand2.get_assertion_instance()))
|
|
if self.operand1_single_value:
|
|
s += 'operand one requires a single-valued attribute - '
|
|
if self.operand2_single_value:
|
|
s += 'operand two requires a single-valued attribute - '
|
|
if not self.operand1_single_value or not self.operand2_single_value:
|
|
if not self.multivalues_explanation:
|
|
s += 'multivalues management is %s' \
|
|
% self.multivalues
|
|
else:
|
|
s += 'The multivalues management is as follows: %s' \
|
|
% self.multivalues_explanation
|
|
s += ')'
|
|
return s
|
|
|
|
|
|
'''
|
|
Predicate are str(id) of attribute definitions
|
|
'''
|
|
|
|
|
|
class AbacRule(models.Model):
|
|
expression = models.CharField(max_length = 2048)
|
|
|
|
def __unicode__(self):
|
|
predicates1 = {}
|
|
predicates2 = {}
|
|
expression = self.expression
|
|
'''
|
|
Here we substitute primary keys with the predicate description.
|
|
|
|
The substitution might fail if some descriptions contains
|
|
primary keys since with an iterative substitution, a
|
|
primary key resulting from a previsous substitution would be
|
|
replaced by another description.
|
|
|
|
To prevent this we realize a two-round substitution with unique
|
|
identifiers.
|
|
'''
|
|
for p in Predicate.objects.filter(rule=self):
|
|
rdm_str = ''.join(\
|
|
random.choice(string.ascii_uppercase) for x in range(8))
|
|
predicates1[p.id] = rdm_str
|
|
predicates2[rdm_str] = str(p.get_predicate_instance())
|
|
for key in predicates1.keys():
|
|
expression = re.sub(str(key), str(predicates1[key]),
|
|
expression)
|
|
for key in predicates2.keys():
|
|
expression = re.sub(str(key), str(predicates2[key]), expression)
|
|
return expression
|