diff --git a/acs/management/commands/initialize-acs.py b/acs/management/commands/initialize-acs.py
index 8066c2f..4ee33ec 100644
--- a/acs/management/commands/initialize-acs.py
+++ b/acs/management/commands/initialize-acs.py
@@ -17,23 +17,16 @@
along with this program. If not, see .
'''
+
import getpass
-import json
-from pprint import pprint
from optparse import make_option
from django.db import transaction
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import User
-from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
-from acs import settings
from acs.models import View, AcsPermission, Namespace, Action, \
UserAlias, Role
-from acs.abac.models import AttributeNamespace, AttributeDefinition, \
- AttributeMapInNamespace, AttributeName
-from acs.abac.core import get_namespace_by_identifier, \
- get_namespace_by_friendly_name
class Command(BaseCommand):
@@ -41,32 +34,17 @@ class Command(BaseCommand):
Initialization script:
- Initialize Acs.
- Create a root user with all rights.
- - Inject the attribute definitions in the database.
Run with:
- python manage.py initialize-acs [--existing] \
- [--without-abac] [--reload-abac]
+ python manage.py initialize-acs [--existing]
--existing
The user already exists in the database.
Else, the user is created.
-
- --reload-abac
- Skip the initial configuration and update the
- attribute definitions.
-
- *** Be sure to use --reload-abac option if it is not the first
- time the script is run and it is not expected to reinitialize
- ACS ***
-
- --without-abac
- Only the initial configuration.
-
Reinitialization:
python manage.py sqlclear acs | python manage.py dbshell \
&& python manage.py syncdb \
- && python manage.py initialize-acs [--existing] \
- [--without-abac] [--reload-abac]
+ && python manage.py initialize-acs [--existing]
If you want to clear all the attribute definitions use
python manage.py sqlclear abac
@@ -94,18 +72,6 @@ class Command(BaseCommand):
dest='existing',
default=False,
help='Indicate if it is an existing user'),
- ) + (
- make_option('--without-abac',
- action='store_true',
- dest='without-abac',
- default=False,
- help='Indicate if it is an existing user'),
- ) + (
- make_option('--reload-abac',
- action='store_true',
- dest='reload-abac',
- default=False,
- help='Indicate if it is an existing user'),
)
args = ''
help = \
@@ -114,252 +80,147 @@ class Command(BaseCommand):
@transaction.commit_manually
def handle(self, *args, **options):
- if not options['reload-abac']:
- try:
- if not args:
- raise CommandError('No username on the command line')
-
- username = args[0]
- user = None
-
- if options['existing']:
- print 'Look for the existing user %s' %username
- try:
- user = User.objects.get(username=username)
- print 'User found: %s' %user
- except:
- raise CommandError('Unable to get existing user %s' \
- %username)
- else:
- print 'Creation of user %s' %username
- try:
- user = User.objects.get(username=username)
- except:
- pass
- if user:
- raise CommandError('Already existing user %s' %user)
-
- MAX_TRIES = 3
- count = 0
- p1, p2 = 1, 2
- while p1 != p2 and count < MAX_TRIES:
- p1 = getpass.getpass(prompt="Password: ")
- if not p1:
- raise CommandError("aborted")
- p2 = getpass.getpass(prompt="Password (again): ")
- if not p2:
- raise CommandError("aborted")
- if p1 != p2:
- print "Passwords do not match. Please try again."
- count = count + 1
-
- if count == MAX_TRIES:
- raise CommandError("Aborting creation of user '%s' after \
- %s attempts" % (username, count))
-
- try:
- user = User(username=username)
- user.set_password(p1)
- user.save()
- print 'User created: %s' %user
- except:
- raise CommandError('Unable to create user %s' %username)
-
- ns = None
- try:
- ns = Namespace(name='Default')
- ns.save()
- print 'Default namespace created: %s' %ns
- except:
- raise CommandError('Unable to create the default namespace')
-
-
- alias = None
- try:
- '''The alias in Default has the username as name'''
- alias = UserAlias(alias=user.username,
- user=user, namespace=ns)
- alias.save()
- print 'Default alias created: %s' %alias
- except:
- raise CommandError('Unable to create the default alias')
-
- #view_name = user.username+'_admin_view'
- view_name = 'root_admin_view'
- view = None
- try:
- view = View(name=view_name, namespace=ns)
- view.save()
- print 'Root system view created: %s' %view
- except:
- raise CommandError('Unable to create root system view')
-
- view.users.add(alias)
- print \
- 'Default alias of the root user added to its root system view'
-
- #role_name = user.username+'_admin_role'
- role_name = 'root_admin_role'
- role=None
- try:
- role = Role(name=role_name, namespace=ns)
- role.save()
- print 'Root system role created: %s' %role
- except:
- raise CommandError('Unable to create root system role')
-
- view.roles.add(role)
- print 'Default role added to the root system view'
-
- a = None
- try:
- a = Action(name='administration')
- a.save()
- print 'Administration action created: %s' %a
- except:
- raise \
- CommandError('Unable to create the administration action')
-
- p = None
- try:
- p = AcsPermission(who=role, what=view, how=a)
- p.save()
- print 'Root administration permission created: %s' %p
- except:
- raise \
- CommandError('Unable to create the root administration \
- permission for the root administration role')
-
- role.users.add(alias)
- print 'User added to the root role'
-
- ua = None
- try:
- ua = Role(name='root_user_administrator_role')
- ua.save()
- print 'Special role user administrator created: %s' %ua
- except:
- raise \
- CommandError('Unable to create special \
- role user administrator')
-
- ua = None
- try:
- ua = Role(name='root_abac_administrator_role')
- ua.save()
- print 'Special role abac administrator created: %s' %ua
- except:
- raise \
- CommandError('Unable to create special \
- role abac administrator')
-
- except:
- transaction.rollback()
- raise
- else:
- transaction.commit()
- print '---> Successful main initialization'
-
- if options['without-abac']:
- raise CommandError('No ABAC, exit')
-
try:
- print '---> Start Attribute Based Access Control application initialisation'
+ if not args:
+ raise CommandError('No username on the command line')
- json_data = open('acs/abac/namespaces.json')
- data = json.load(json_data)
- print '--> Namespace file contains:'
- pprint(data)
+ username = args[0]
+ user = None
- for ns in data:
- print 'Looking for ns with friendly name %s and identifier %s' \
- % (ns, data[ns])
- ok = False
+ if options['existing']:
+ print 'Look for the existing user %s' %username
try:
- AttributeNamespace.objects.get(friendly_name=ns)
- print '- existing namespace with friendly name %s, not updated' %ns
- except ObjectDoesNotExist:
- ok = True
- except MultipleObjectsReturned:
- print '###ERROR: multiple namespace with the friendly name %s' %ns
- if ok:
- ok = False
- try:
- AttributeNamespace.objects.get(identifier=data[ns])
- print '- existing namespace with identifier %s, not updated' %data[ns]
- except ObjectDoesNotExist:
- ok = True
- except MultipleObjectsReturned:
- print '###ERROR: multiple namespace with the identifier %s' %data[ns]
- if ok:
- nsi = AttributeNamespace(friendly_name=ns, identifier=data[ns])
- nsi.save()
- print '+ namespace added'
+ user = User.objects.get(username=username)
+ print 'User found: %s' %user
+ except:
+ raise CommandError('Unable to get existing user %s' \
+ %username)
+ else:
+ print 'Creation of user %s' %username
+ try:
+ user = User.objects.get(username=username)
+ except:
+ pass
+ if user:
+ raise CommandError('Already existing user %s' %user)
- json_data.close()
+ MAX_TRIES = 3
+ count = 0
+ p1, p2 = 1, 2
+ while p1 != p2 and count < MAX_TRIES:
+ p1 = getpass.getpass(prompt="Password: ")
+ if not p1:
+ raise CommandError("aborted")
+ p2 = getpass.getpass(prompt="Password (again): ")
+ if not p2:
+ raise CommandError("aborted")
+ if p1 != p2:
+ print "Passwords do not match. Please try again."
+ count = count + 1
- json_data = open('acs/abac/attribute_mapping.json')
- data = json.load(json_data)
- print '--> Attribute mapping file contains:'
- pprint(data)
+ if count == MAX_TRIES:
+ raise CommandError("Aborting creation of user '%s' after \
+ %s attempts" % (username, count))
- for attribute in data:
- print 'Dealing with attribute %s' %attribute
- for attr_decl in data[attribute]:
- print 'Working on %s' %attr_decl
+ try:
+ user = User(username=username)
+ user.set_password(p1)
+ user.save()
+ print 'User created: %s' %user
+ except:
+ raise CommandError('Unable to create user %s' %username)
- t = attr_decl['type']
- print 'Loof for a definition of %s of type %s' \
- % (attribute, t)
+ ns = None
+ try:
+ ns = Namespace(name='Default')
+ ns.save()
+ print 'Default namespace created: %s' %ns
+ except:
+ raise CommandError('Unable to create the default namespace')
- d, c = AttributeDefinition.objects.get_or_create(attribute_name=attribute)
- if c:
- print "+ definition created"
- else:
- print "- existing definition"
- if d.attribute_type != t:
- d.attribute_type = t
- d.save()
+ alias = None
+ try:
+ '''The alias in Default has the username as name'''
+ alias = UserAlias(alias=user.username,
+ user=user, namespace=ns)
+ alias.save()
+ print 'Default alias created: %s' %alias
+ except:
+ raise CommandError('Unable to create the default alias')
- definitions = attr_decl['definitions']
- if not definitions:
- print "Missing definitions"
- else:
- print "Working with definitions %s" %definitions
- for ns_decl in definitions:
- ns_name = ns_decl[0]
- print 'Namespace declared is %s' %ns_name
- ns = None
- ns = get_namespace_by_friendly_name(ns_name)
- if not ns:
- ns = get_namespace_by_identifier(ns_name)
- if not ns:
- print 'Namespace with name or id %s not found' %ns_name
- else:
- print 'Namespace found %s' %ns
+ #view_name = user.username+'_admin_view'
+ view_name = 'root_admin_view'
+ view = None
+ try:
+ view = View(name=view_name, namespace=ns)
+ view.save()
+ print 'Root system view created: %s' %view
+ except:
+ raise CommandError('Unable to create root system view')
- m, c = AttributeMapInNamespace.objects.get_or_create(namespace=ns,
- attribute_definition=d)
- if c:
- print "+ map created"
- else:
- print "- existing map"
- names = ns_decl[1]
- for name in names:
- print 'Found name %s' %name
- a, c = AttributeName.objects.get_or_create(attribute_map=m, name=name)
- if c:
- print "+ name created"
- else:
- print "- existing name"
+ view.users.add(alias)
+ print \
+ 'Default alias of the root user added to its root system view'
- json_data.close()
+ #role_name = user.username+'_admin_role'
+ role_name = 'root_admin_role'
+ role=None
+ try:
+ role = Role(name=role_name, namespace=ns)
+ role.save()
+ print 'Root system role created: %s' %role
+ except:
+ raise CommandError('Unable to create root system role')
- except Exception, err:
+ view.roles.add(role)
+ print 'Default role added to the root system view'
+
+ a = None
+ try:
+ a = Action(name='administration')
+ a.save()
+ print 'Administration action created: %s' %a
+ except:
+ raise \
+ CommandError('Unable to create the administration action')
+
+ p = None
+ try:
+ p = AcsPermission(who=role, what=view, how=a)
+ p.save()
+ print 'Root administration permission created: %s' %p
+ except:
+ raise \
+ CommandError('Unable to create the root administration \
+ permission for the root administration role')
+
+ role.users.add(alias)
+ print 'User added to the root role'
+
+ ua = None
+ try:
+ ua = Role(name='root_user_administrator_role')
+ ua.save()
+ print 'Special role user administrator created: %s' %ua
+ except:
+ raise \
+ CommandError('Unable to create special \
+ role user administrator')
+
+ ua = None
+ try:
+ ua = Role(name='root_abac_administrator_role')
+ ua.save()
+ print 'Special role abac administrator created: %s' %ua
+ except:
+ raise \
+ CommandError('Unable to create special \
+ role abac administrator')
+
+ except:
transaction.rollback()
- print '---> Error due to %s' %str(err)
- print '---> All transactions cancelled'
+ raise
else:
transaction.commit()
- print '---> Successful ABAC initialization'
+ print '---> Successful main initialization'
diff --git a/acs/management/commands/request-acs.py b/acs/management/commands/request-acs.py
index 0c49a7b..dbb90b0 100644
--- a/acs/management/commands/request-acs.py
+++ b/acs/management/commands/request-acs.py
@@ -26,28 +26,7 @@ from django.db import transaction
from acs import settings
-from acs.abac.models import *
-from acs.abac.core import check_predicates, \
- arrange_missing_predicates, get_source_form_name, \
- get_def_from_name_and_ns, add_assertion_to_profile, \
- make_new_rule_from_missing_predicates, \
- get_attribute_definition_by_name, load_profile_by_dic, \
- check_predicate_role, load_or_create_user_profile
-
-from acs.abac.logic import evaluation, return_sorted_variables_to_truth
-
-from acs.xacml.constants import *
-
-from acs.models import Role, UserAlias, AcsObject, Action, \
- AcsAbacPermission, Namespace
-
-
-from acs.core import create_policy, remove_policy, \
- add_role, mod_role, add_object, add_action, add_permission, \
- add_view, add_activity, mod_view, mod_activity, \
- is_authorized_by_names_with_abac
-
-from acs.signals import attributes_call
+from acs.core import is_authorized_by_names_with_abac
class Command(BaseCommand):
'''
diff --git a/acs/management/commands/test-abac.py b/acs/management/commands/test-abac.py
index 5b69115..0e3d974 100644
--- a/acs/management/commands/test-abac.py
+++ b/acs/management/commands/test-abac.py
@@ -23,19 +23,20 @@ import time
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
+from django.contrib.auth.models import User
+
+from attribute_aggregator.xacml_constants import *
+from attribute_aggregator.models import AttributeSource, LdapSource, \
+ AttributeData
+from attribute_aggregator.core import set_user_alias_in_source
from acs.abac.models import *
from acs.abac.core import check_predicates, \
- arrange_missing_predicates, get_source_form_name, \
- get_def_from_name_and_ns, add_assertion_to_profile, \
+ arrange_missing_predicates, \
make_new_rule_from_missing_predicates, \
- get_attribute_definition_by_name, load_profile_by_dic, \
check_predicate_role
-
from acs.abac.logic import evaluation, return_sorted_variables_to_truth
-from acs.xacml.constants import *
-
from acs.models import Role, UserAlias, AcsObject, Action, AcsAbacPermission
from acs.core import create_policy, remove_policy, \
@@ -62,6 +63,7 @@ class Command(BaseCommand):
print '-------- ABAC Tests --------'
'''
+ Exemple:
(age of (IdP1 or IdP2) >= 18 and (nationality of IdP1 == 'FRA'
or nationality of IdP1 == 'ITA'))
or
@@ -79,231 +81,71 @@ class Command(BaseCommand):
try:
- s1, c = Source.objects.get_or_create(name="IdP1")
- s1.save()
- print 'Source created: %s' % s1
- s2, c = Source.objects.get_or_create(name="IdP2")
- s2.save()
- print 'Source created: %s' % s2
- s3, c = Source.objects.get_or_create(name="IdP3")
- s3.save()
- print 'Source created: %s' % s3
+ s1, c = AttributeSource.objects.get_or_create(name="IdP1")
+ print 'AttributeSource created: %s' % s1
+ s2, c = AttributeSource.objects.get_or_create(name="IdP2")
+ print 'AttributeSource created: %s' % s2
+ s4,c = LdapSource.objects.get_or_create(name="LDAP1", server="127.0.0.1",
+ base="dc=entrouvert,dc=lan")
+ print 'LdapSource created: %s' % s4
rule = AbacRule()
rule.save()
- def_sn = get_attribute_definition_by_name('surname')
- def_fn = get_attribute_definition_by_name('firstname')
- def_typecert = get_attribute_definition_by_name('certificate_type')
- def_age = get_attribute_definition_by_name('age')
- def_unikid = get_attribute_definition_by_name('unique_ID')
- def_nat = get_attribute_definition_by_name('nationality')
- if not (def_sn and def_fn and def_age and def_typecert and def_nat and def_unikid):
- print 'Missing definition'
- raise
-
- adef_sn1 = AssertionDefinition(attribute_definition=def_sn)
+ adef_sn1 = AssertionDefinition(definition='surname')
adef_sn1.save()
- att_s = AttachedSource(assertion=adef_sn1, source=s1)
- att_s.save()
- adef_sn2 = AssertionDefinition(attribute_definition=def_sn)
+ adef_sn1.add_source(s1)
+ print "AssertionDefinition: %s" % adef_sn1
+ adef_sn2 = AssertionDefinition(definition='surname')
adef_sn2.save()
- att_s = AttachedSource(assertion=adef_sn2, source=s2)
- att_s.save()
- adef_sn3 = AssertionDefinition(attribute_definition=def_sn)
- adef_sn3.save()
- att_s = AttachedSource(assertion=adef_sn3, source=s3)
- att_s.save()
- adef_fn1 = AssertionDefinition(attribute_definition=def_fn)
- adef_fn1.save()
- att_s = AttachedSource(assertion=adef_fn1, source=s1)
- att_s.save()
- adef_fn2 = AssertionDefinition(attribute_definition=def_fn)
- adef_fn2.save()
- att_s = AttachedSource(assertion=adef_fn2, source=s2)
- att_s.save()
- adef_fn3 = AssertionDefinition(attribute_definition=def_fn)
- adef_fn3.save()
- att_s = AttachedSource(assertion=adef_fn3, source=s3)
- att_s.save()
- adef_typecert1 = AssertionDefinition(attribute_definition=def_typecert)
- adef_typecert1.save()
- att_s = AttachedSource(assertion=adef_typecert1, source=s1)
- att_s.save()
- adef_typecert2 = AssertionDefinition(attribute_definition=def_typecert)
- adef_typecert2.save()
- att_s = AttachedSource(assertion=adef_typecert2, source=s2)
- att_s.save()
- adef_typecert3 = AssertionDefinition(attribute_definition=def_typecert)
- adef_typecert3.save()
- att_s = AttachedSource(assertion=adef_typecert3, source=s3)
- att_s.save()
- adef_age1 = AssertionDefinition(attribute_definition=def_age)
- adef_age1.save()
- att_s = AttachedSource(assertion=adef_age1, source=s1)
- att_s.save()
- att_s = AttachedSource(assertion=adef_age1, source=s2)
- att_s.save()
- adef_unikid1 = AssertionDefinition(attribute_definition=def_unikid)
- adef_unikid1.save()
- att_s = AttachedSource(assertion=adef_unikid1, source=s1)
- att_s.save()
- att_s = AttachedSource(assertion=adef_unikid1, source=s2)
- att_s.save()
- adef_nationality1 = AssertionDefinition(attribute_definition=def_nat)
- adef_nationality1.save()
- att_s = AttachedSource(assertion=adef_nationality1, source=s1)
- att_s.save()
+ adef_sn2.add_source(s4)
+ print "AssertionDefinition: %s" % adef_sn2
- #Define numeric value 18
- val18 = AttributeData(definition=def_age)
- val18.save()
- IntegerM(data=val18, value=18).save()
- val18_d = AssertionData(attribute_data=val18)
- val18_d.save()
- #Define numeric value 21
- val21 = AttributeData(definition=def_age)
- val21.save()
- IntegerM(data=val21, value=21).save()
- val21_d = AssertionData(attribute_data=val21)
- val21_d.save()
- #Define String for nationalities
- french = AttributeData(definition=def_nat)
- french.save()
- StringM(data=french, value='FRA').save()
- french_d = AssertionData(attribute_data=french)
- french_d.save()
- italian = AttributeData(definition=def_nat)
- italian.save()
- StringM(data=italian, value='ITA').save()
- italian_d = AssertionData(attribute_data=italian)
- italian_d.save()
- english = AttributeData(definition=def_nat)
- english.save()
- StringM(data=english, value='GBR').save()
- english_d = AssertionData(attribute_data=english)
- english_d.save()
- spanish = AttributeData(definition=def_nat)
- spanish.save()
- StringM(data=spanish, value='ESP').save()
- spanish_d = AssertionData(attribute_data=spanish)
- spanish_d.save()
- #Define String for certificate types
- eID = AttributeData(definition=def_typecert)
- eID.save()
- StringM(data=eID, value='eID').save()
- eID_d = AssertionData(attribute_data=eID)
- eID_d.save()
- drvlicence = AttributeData(definition=def_typecert)
- drvlicence.save()
- StringM(data=drvlicence, value='driver_licence').save()
- drvlicence_d = AssertionData(attribute_data=drvlicence)
- drvlicence_d.save()
- assass = AttributeData(definition=def_typecert)
- assass.save()
- StringM(data=assass, value='assurance_assessment').save()
- assass_d = AssertionData(attribute_data=assass)
- assass_d.save()
-
- #predicates for age
- p1 = PredicateComparison(operand1=adef_age1, operand2=val18_d,
- comparison_type=ACS_XACML_COMPARISON_INTEGER_GRT_OE,
- operand1_single_value=True, operand2_single_value=True,
+ p1 = PredicateRequired(assertion_definition=adef_sn1,
rule=rule)
p1.save()
- print "Predicate age 1 %s, id: %s" % (p1, p1.id)
- p2 = PredicateComparison(operand1=adef_age1, operand2=val21_d,
- comparison_type=ACS_XACML_COMPARISON_INTEGER_GRT_OE,
- operand1_single_value=True, operand2_single_value=True,
+ print "PredicateRequired: %s" % p1
+ p2 = PredicateRequired(assertion_definition=adef_sn2,
rule=rule)
p2.save()
- print "Predicate age 2 %s, id: %s" % (p2, p2.id)
- #predicates for nationality:
- p3 = PredicateComparison(operand1=adef_nationality1, operand2=french_d,
+ print "PredicateRequired: %s" % p2
+
+ p3 = PredicateComparison(operand1=adef_sn1, operand2=adef_sn2,
comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
multivalues='EQUAL_OP1_SUBSET_OP2', rule=rule)
p3.save()
- print "Predicate nationality 3 %s, id: %s" % (p3, p3.id)
- p4 = PredicateComparison(operand1=adef_nationality1, operand2=italian_d,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
- multivalues='EQUAL_OP1_SUBSET_OP2', rule=rule)
- p4.save()
- print "Predicate nationality 4 %s, id: %s" % (p4, p4.id)
- p5 = PredicateComparison(operand1=adef_nationality1, operand2=english_d,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
- multivalues='EQUAL_OP1_SUBSET_OP2', rule=rule)
- p5.save()
- print "Predicate nationality 5 %s, id: %s" % (p5, p5.id)
- p6 = PredicateComparison(operand1=adef_nationality1, operand2=spanish_d,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
- multivalues='EQUAL_OP1_SUBSET_OP2', rule=rule)
- p6.save()
- print "Predicate nationality 6 %s, id: %s" % (p6, p6.id)
- #Predicates only required
- p7 = PredicateRequired(definition=adef_unikid1,
- single_value=True, rule=rule)
- p7.save()
- print "Predicate required unique ID: %s, id: %s" % (p7, p7.id)
- #Predicates for certificate types
- p8 = PredicateComparison(operand1=adef_typecert1, operand2=eID_d,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
- operand1_single_value=True, operand2_single_value=True,
- rule=rule)
- p8.save()
- print "Predicate certificate type 8 %s, id: %s" % (p8, p8.id)
- p9 = PredicateComparison(operand1=adef_typecert2, operand2=drvlicence_d,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
- operand1_single_value=True, operand2_single_value=True,
- rule=rule)
- p9.save()
- print "Predicate certificate type 9 %s, id: %s" % (p9, p9.id)
- p10 = PredicateComparison(operand1=adef_typecert3, operand2=assass_d,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
- operand1_single_value=True, operand2_single_value=True,
- rule=rule)
- p10.save()
- print "Predicate certificate type 10 %s, id: %s" % (p10, p10.id)
- #Predicates for same identity checking
- p11 = PredicateComparison(operand1=adef_sn1, operand2=adef_sn2,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE, rule=rule,
- multivalues='EQUAL_EXACT_MATCH')
- p11.save()
- print "Predicate check identity 11 %s, id: %s" % (p11, p11.id)
- p12 = PredicateComparison(operand1=adef_fn1, operand2=adef_fn2,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE, rule=rule,
- multivalues='EQUAL_OP1_SUBSET_OP2')
- p12.save()
- print "Predicate check identity 12 %s, id: %s" % (p12, p12.id)
- p13 = PredicateComparison(operand1=adef_sn1, operand2=adef_sn3,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE, rule=rule,
- multivalues='EQUAL_EXACT_MATCH')
- p13.save()
- print "Predicate check identity 13 %s, id: %s" % (p13, p13.id)
- p14 = PredicateComparison(operand1=adef_fn1, operand2=adef_fn3,
- comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE, rule=rule,
- multivalues='EQUAL_OP1_SUBSET_OP2')
- p14.save()
- print "Predicate check identity 14 %s, id: %s" % (p14, p14.id)
- #Bags of values to test multivalued attributes
- bag1 = AttributeData(definition=def_age)
- bag1.save()
- IntegerM(data=bag1, value=3).save()
- IntegerM(data=bag1, value=5).save()
- bag1_d = AssertionData(attribute_data=bag1)
- bag1_d.save()
- bag2 = AttributeData(definition=def_age)
- bag2.save()
- IntegerM(data=bag2, value=4).save()
- IntegerM(data=bag2, value=5).save()
- IntegerM(data=bag2, value=6).save()
- bag2_d = AssertionData(attribute_data=bag2)
- bag2_d.save()
- p15 = PredicateComparison(operand1=bag1_d, operand2=bag2_d,
- comparison_type=ACS_XACML_COMPARISON_INTEGER_GRT, rule=rule,
- multivalues='DIFF_ONE_OP1_WITH_BOTTOM_LIMIT_OP2')
- p15.save()
- print "Predicate test comp 15 %s, id: %s" % (p15, p15.id)
+ adef_age1 = AssertionDefinition(definition='age')
+ adef_age1.save()
+ adef_age1.add_source(s1)
+ print "AssertionDefinition: %s" % adef_sn1
+
+ val18 = AttributeData(definition='age', values=(str(18),))
+ print "AttributeData: %s" % val18.__unicode__()
+ val18_d = AssertionData()
+ val18_d.set_attribute_data(val18)
+ val18_d.save()
+ print "AssertionData: %s" % val18_d
+
+ p4 = PredicateComparison(operand1=adef_age1, operand2=val18_d,
+ comparison_type=ACS_XACML_COMPARISON_INTEGER_GRT_OE,
+ operand1_single_value=True, operand2_single_value=True,
+ rule=rule)
+ p4.save()
+
+ adef_fn1 = AssertionDefinition(definition='firstname')
+ adef_fn1.save()
+ adef_fn1.add_source(s1)
+ print "AssertionDefinition: %s" % adef_fn1
+ adef_fn2 = AssertionDefinition(definition='firstname')
+ adef_fn2.save()
+ adef_fn2.add_source(s2)
+ print "AssertionDefinition: %s" % adef_fn2
+
+ p5 = PredicateComparison(operand1=adef_fn1, operand2=adef_fn2,
+ comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
+ multivalues='EQUAL_EXACT_MATCH', rule=rule)
+ p5.save()
print '--> Create a user'
rdm_str = ''.join(random.choice(string.ascii_uppercase + \
@@ -317,7 +159,7 @@ class Command(BaseCommand):
print "User %s created" % user
print '<--\n'
- requester = User.objects.get(username='root')
+ requester = User.objects.get(username='mikael')
print '--> Create one policy'
name = 'policy_' + rdm_str
@@ -337,6 +179,11 @@ class Command(BaseCommand):
print "Alias %s created" % alias
print '<--\n'
+ print '--> Set user in the LDAP source'
+ set_user_alias_in_source(user, s4,
+ 'uid=mikael,ou=people,dc=entrouvert,dc=lan',
+ force_change=True)
+
print '--> Create two roles'
role_name_1 = 'role_1_' + rdm_str
role_1 = add_role(requester, role_name_1, policy)
@@ -375,10 +222,23 @@ class Command(BaseCommand):
p16.save()
print "Predicate role 16 %s, id: %s" % (p16, p16.id)
- str_rule = "(((%s&(%s|%s))|(%s&(%s|%s)))&(%s&%s&%s&%s&%s&%s&%s&%s&%s)|%s)" % (p1.id, p3.id, p4.id, p2.id, p5.id, p6.id, p7.id, p8.id, p9.id, p10.id, p11.id, p12.id, p13.id, p14.id, p15.id, p16.id)
+ '''
+ Set ABAC Rule
+ '''
+ str_rule = "%s&%s&%s&%s&%s&%s" % (p1.id, p2.id, p3.id, p4.id, p5.id, p16.id)
+ #TODO: set_rule (strip space!)
rule.expression=str_rule
rule.save()
+ '''
+ r1 -- user
+ \_ r2
+ v1 -- o1
+ \_ v2 -- o2
+ A1 -- a1
+ \_ A2 -- a2
+ '''
+
object_1_name = 'object_1_' + rdm_str
object_1 = add_object(requester, object_1_name, policy)
if not object_1:
@@ -460,27 +320,16 @@ class Command(BaseCommand):
'''
Add permission to policy
'''
- AcsAbacPermission(who=None, what=view_2, how=activity_2, rule=rule).save()
+ AcsAbacPermission(who=None, what=view_1, how=activity_1, rule=rule).save()
'''
Request policy
'''
attributes = {}
+
data = []
attr = {}
- attr['name'] = 'certificate_type'
- attr['namespace'] = 'ACS-ABAC'
- attr['values'] = ('eID',)
- data.append(attr)
- attr = {}
- attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth'
- attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
- attr['values'] = ('22', )
- exp = datetime.datetime.now() + datetime.timedelta(days=1)
- attr['expiration_date'] = exp.isoformat()
- data.append(attr)
- attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Ates',)
@@ -491,137 +340,33 @@ class Command(BaseCommand):
attr['values'] = ('Mikael', 'Ersin',)
data.append(attr)
attr = {}
- attr['name'] = 'Nationality'
- attr['namespace'] = 'ISO7501-1'
- attr['values'] = ('FRA',)
+ attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth'
+ attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
+ attr['values'] = ('19', )
data.append(attr)
attributes['IdP1'] = data
data = []
attr = {}
- attr['name'] = 'certificate_type'
- attr['namespace'] = 'ACS-ABAC'
- attr['values'] = ('driver_licence',)
- data.append(attr)
- attr = {}
- attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
- attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
- attr['values'] = ('Ates',)
- data.append(attr)
- attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Mikael', 'Ersin',)
data.append(attr)
- attr = {}
- attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/privatepersonalidentifier'
- attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
- attr['values'] = ('31q3sdf1q3sdf213q2d1f5qs',)
- data.append(attr)
-# attr = {}
-# attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/privatepersonalidentifier'
-# attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
-# attr['values'] = ('41q3sdf1q3sdf213q2d1f5qs',)
-# data['attributes'].append(attr)
- attr = {}
- attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth'
- attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
- attr['values'] = ('17', )
- data.append(attr)
attributes['IdP2'] = data
- data = []
- attr = {}
- attr['name'] = 'certificate_type'
- attr['namespace'] = 'ACS-ABAC'
- attr['values'] = ('assurance_assessment',)
- data.append(attr)
- attr = {}
- attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
- attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
- attr['values'] = ('Ates',)
- data.append(attr)
- attr = {}
- attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
- attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
- attr['values'] = ('Mikael', 'Ersin', 'Bob')
- data.append(attr)
- attributes['IdP3'] = data
-
decision, msg, error = \
is_authorized_by_names_with_abac(requestor_name=user.username,
who_name=user.username,
what_name=object_1_name,
how_name=action_1_name,
namespace_name=namespace,
- attributes=attributes,
- no_rule_returned=True)
- if error < 0:
- raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
- if decision:
- print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
- print "Access granted by permission %s" % msg
- print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
- else:
- print "------------------------------------------------------"
- print "Access denied, new rule to satisfy %s" % msg
- print "------------------------------------------------------"
-
- decision, msg, error = \
- is_authorized_by_names_with_abac(requestor_name=user.username,
- who_name=user.username,
- what_name=view_1_name, view=True,
- how_name=activity_1_name, activity = True,
- namespace_name=namespace,
- attributes=attributes,
- no_rule_returned=True)
- if error < 0:
- raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
- if decision:
- print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
- print "Access granted by permission %s" % msg
- print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
- else:
- print "------------------------------------------------------"
- print "Access denied, new rule to satisfy %s" % msg
- print "------------------------------------------------------"
-
- decision, msg, error = \
- is_authorized_by_names_with_abac(requestor_name=user.username,
- who_name=user.username,
- what_name=object_2_name,
- how_name=action_2_name,
- namespace_name=namespace,
- attributes=attributes,
- no_rule_returned=True)
- if error < 0:
- raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
- if decision:
- print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
- print "Access granted by permission %s" % msg
- print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
- else:
- print "------------------------------------------------------"
- print "Access denied, new rule to satisfy %s" % msg
- print "------------------------------------------------------"
-
- decision, msg, error = \
- is_authorized_by_names_with_abac(requestor_name=user.username,
- who_name=user.username,
- what_name=view_2_name, view=True,
- how_name=activity_2_name, activity=True,
- namespace_name=namespace,
attributes=attributes)
if error < 0:
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
if decision:
- print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
- print "Access granted by permission %s" % msg
- print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
+ print "+++ Access granted by permission %s" % msg
else:
- print "------------------------------------------------------"
- print "Access denied, new rule to satisfy %s" % msg
- print "------------------------------------------------------"
+ print "--- Access denied, new rule to satisfy %s" % msg
except Exception, err:
print "Exception: %s" %str(err)