This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
veridic/acs/management/commands/test-abac.py

379 lines
15 KiB
Python

'''
VERIDIC - Towards a centralized access control system
Copyright (C) 2011 Mikael Ates
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
import datetime
import time
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.contrib.auth.models import User
from attribute_aggregator.xacml_constants import *
from attribute_aggregator.models import AttributeSource, LdapSource, \
AttributeData
from attribute_aggregator.core import set_user_alias_in_source
from acs.abac.models import *
from acs.abac.core import check_predicates, \
arrange_missing_predicates, \
make_new_rule_from_missing_predicates, \
check_predicate_role
from acs.abac.logic import evaluation, return_sorted_variables_to_truth
from acs.models import Role, UserAlias, AcsObject, Action, AcsAbacPermission
from acs.core import create_policy, remove_policy, \
add_role, mod_role, add_object, add_action, add_permission, \
add_view, add_activity, mod_view, mod_activity, \
is_authorized_by_names_with_abac
class Command(BaseCommand):
'''
Script to make tests on ABAC
'''
can_import_django_settings = True
output_transaction = True
requires_model_validation = True
option_list = BaseCommand.option_list
help = \
'No help.'
@transaction.commit_manually
def handle(self, *args, **options):
print '-------- ABAC Tests --------'
'''
Exemple:
(age of (IdP1 or IdP2) >= 18 and (nationality of IdP1 == 'FRA'
or nationality of IdP1 == 'ITA'))
or
(age of (IdP1 or IdP2) >= 21 and (nationality of IdP1 == 'GBR'
or nationality of IdP1 == 'ESP'))
and unique_ID of (IdP1 or IdP2)
and type_cert of IdP1 == 'eID'
and type_cert of IdP2 == 'driver_licence'
and type_cert of IdP3 == 'assurance_assessment'
and surname of IdP1 == surname of IdP2
and firstname of IdP1 == firstname of IdP2
and surname of IdP1 == surname of IdP3
and firstname of IdP1 == firstname of IdP3
'''
try:
s1, c = AttributeSource.objects.get_or_create(name="IdP1")
print 'AttributeSource created: %s' % s1
s2, c = AttributeSource.objects.get_or_create(name="IdP2")
print 'AttributeSource created: %s' % s2
s4,c = LdapSource.objects.get_or_create(name="LDAP1", server="127.0.0.1",
base="dc=entrouvert,dc=lan")
print 'LdapSource created: %s' % s4
rule = AbacRule()
rule.save()
adef_sn1 = AssertionDefinition(definition='surname')
adef_sn1.save()
adef_sn1.add_source(s1)
print "AssertionDefinition: %s" % adef_sn1
adef_sn2 = AssertionDefinition(definition='surname')
adef_sn2.save()
adef_sn2.add_source(s4)
print "AssertionDefinition: %s" % adef_sn2
p1 = PredicateRequired(assertion_definition=adef_sn1,
rule=rule)
p1.save()
print "PredicateRequired: %s" % p1
p2 = PredicateRequired(assertion_definition=adef_sn2,
rule=rule)
p2.save()
print "PredicateRequired: %s" % p2
p3 = PredicateComparison(operand1=adef_sn1, operand2=adef_sn2,
comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
multivalues='EQUAL_OP1_SUBSET_OP2', rule=rule)
p3.save()
adef_age1 = AssertionDefinition(definition='age')
adef_age1.save()
adef_age1.add_source(s1)
print "AssertionDefinition: %s" % adef_sn1
val18 = AttributeData(definition='age', values=(str(18),))
print "AttributeData: %s" % val18.__unicode__()
val18_d = AssertionData()
val18_d.set_attribute_data(val18)
val18_d.save()
print "AssertionData: %s" % val18_d
p4 = PredicateComparison(operand1=adef_age1, operand2=val18_d,
comparison_type=ACS_XACML_COMPARISON_INTEGER_GRT_OE,
operand1_single_value=True, operand2_single_value=True,
rule=rule)
p4.save()
adef_fn1 = AssertionDefinition(definition='firstname')
adef_fn1.save()
adef_fn1.add_source(s1)
print "AssertionDefinition: %s" % adef_fn1
adef_fn2 = AssertionDefinition(definition='firstname')
adef_fn2.save()
adef_fn2.add_source(s2)
print "AssertionDefinition: %s" % adef_fn2
p5 = PredicateComparison(operand1=adef_fn1, operand2=adef_fn2,
comparison_type=ACS_XACML_COMPARISON_EQUALITY_STRING_IGN_CASE,
multivalues='EQUAL_EXACT_MATCH', rule=rule)
p5.save()
print '--> Create a user'
rdm_str = ''.join(random.choice(string.ascii_uppercase + \
string.digits) for x in range(8))
username = 'user_' + rdm_str
user, created = User.objects.get_or_create(username=username)
if not user:
raise CommandError('Error creating user %s' % user)
user.set_password(username)
user.save()
print "User %s created" % user
print '<--\n'
requester = User.objects.get(username='mikael')
print '--> Create one policy'
name = 'policy_' + rdm_str
namespace = 'namespace_' + rdm_str
policy, created = create_policy(name, requester,
namespace=namespace)
if not policy:
raise CommandError('Error creating policy %s' % policy)
print "Policy %s created" % policy
print '<--\n'
print '--> Set user in policy'
alias, created = UserAlias.objects.get_or_create(user=user,
alias=user.username, namespace=policy.namespace)
if not alias:
raise CommandError('Error creating alias %s' % alias)
print "Alias %s created" % alias
print '<--\n'
print '--> Set user in the LDAP source'
set_user_alias_in_source(user, s4,
'uid=mikael,ou=people,dc=entrouvert,dc=lan',
force_change=True)
print '--> Create two roles'
role_name_1 = 'role_1_' + rdm_str
role_1 = add_role(requester, role_name_1, policy)
if not role_1:
raise CommandError('Unable to handle a role due to %s' \
% str(role_1))
print "Role %s created" % role_1
role_name_2 = 'role_2_' + rdm_str
role_2 = add_role(requester, role_name_2, policy)
if not role_2:
raise CommandError('Unable to handle a role due to %s' \
% str(role_2))
print "Role %s created" % role_2
print '<--\n'
print '--> Add role2 to role_1'
r = mod_role(requester, role_1, policy,
roles_added=[role_2])
if r < 0:
raise CommandError(\
'Unable to handle a role due to %s' % str(r))
print "Role %s added to %s" % (role_2, role_1)
print '<--\n'
print '--> Add user to the role_1'
r = mod_role(requester, role_1, policy,
users_added=[alias])
if r < 0:
raise CommandError(\
'Unable to handle a role due to %s' % str(r))
print "User %s added to %s" % (alias, role_1)
print '<--\n'
print '--> Create one predicateRole'
p16 = PredicateRole(role=role_2, rule=rule)
p16.save()
print "Predicate role 16 %s, id: %s" % (p16, p16.id)
'''
Set ABAC Rule
'''
str_rule = "%s&%s&%s&%s&%s&%s" % (p1.id, p2.id, p3.id, p4.id, p5.id, p16.id)
#TODO: set_rule (strip space!)
rule.expression=str_rule
rule.save()
'''
r1 -- user
\_ r2
v1 -- o1
\_ v2 -- o2
A1 -- a1
\_ A2 -- a2
'''
object_1_name = 'object_1_' + rdm_str
object_1 = add_object(requester, object_1_name, policy)
if not object_1:
raise CommandError('Unable to handle an object due to \
%s' % str(object_1))
object_2_name = 'object_2_' + rdm_str
object_2 = add_object(requester, object_2_name, policy)
if not object_2:
raise CommandError('Unable to handle an object due to \
%s' % str(object_1))
action_1_name = 'action_1_' + rdm_str
action_1 = add_action(requester, action_1_name, policy)
if not action_1:
raise CommandError('Unable to handle an action due to \
%s' % str(action_1))
action_2_name = 'action_2_' + rdm_str
action_2 = add_action(requester, action_2_name, policy)
if not action_2:
raise CommandError('Unable to handle an action due to \
%s' % str(action_1))
view_1_name = 'view_1_' + rdm_str
view_1 = add_view(requester, view_1_name, policy)
if not view_1:
raise CommandError('Unable to handle an view due to \
%s' % str(view_1))
view_2_name = 'view_2_' + rdm_str
view_2 = add_view(requester, view_2_name, policy)
if not view_2:
raise CommandError('Unable to handle an view due to \
%s' % str(view_2))
activity_1_name = 'activity_1_' + rdm_str
activity_1 = add_activity(requester, activity_1_name, policy)
if not activity_1:
raise CommandError('Unable to handle an activity due to \
%s' % str(activity_1))
activity_2_name = 'activity_2_' + rdm_str
activity_2 = add_activity(requester, activity_2_name, policy)
if not activity_1:
raise CommandError('Unable to handle an activity due to \
%s' % str(activity_2))
print '--> Add object 2 to view 2'
r = mod_view(requester, view_2, policy,
objects_added=[object_2])
if r < 0:
raise CommandError(\
'Unable to handle a view due to %s' % str(r))
print "Object %s added to %s" % (object_2, view_2)
print '<--\n'
print '--> Add object 1 and view 2 to view 1'
r = mod_view(requester, view_1, policy,
objects_added=[object_1], views_added=[view_2])
if r < 0:
raise CommandError(\
'Unable to handle a view due to %s' % str(r))
print "Object %s and view %s added to %s" % (object_1, view_2, view_1)
print '<--\n'
print '--> Add action 2 to activity 2'
r = mod_activity(requester, activity_2, policy,
actions_added=[action_2])
if r < 0:
raise CommandError(\
'Unable to handle an activity due to %s' % str(r))
print "Action %s added to %s" % (action_2, activity_2)
print '<--\n'
print '--> Add action 1 and activity 2 to view 1'
r = mod_activity(requester, activity_1, policy,
actions_added=[action_1], activities_added=[activity_2])
if r < 0:
raise CommandError(\
'Unable to handle an activity due to %s' % str(r))
print "Action %s and activity %s added to %s" % (action_1, activity_2, activity_1)
print '<--\n'
'''
Add permission to policy
'''
AcsAbacPermission(who=None, what=view_1, how=activity_1, rule=rule).save()
'''
Request policy
'''
attributes = {}
data = []
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Ates',)
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Mikael', 'Ersin',)
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('19', )
data.append(attr)
attributes['IdP1'] = data
data = []
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Mikael', 'Ersin',)
data.append(attr)
attributes['IdP2'] = data
decision, msg, error = \
is_authorized_by_names_with_abac(requestor_name=user.username,
who_name=user.username,
what_name=object_1_name,
how_name=action_1_name,
namespace_name=namespace,
attributes=attributes)
if error < 0:
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
if decision:
print "+++ Access granted by permission %s" % msg
else:
print "--- Access denied, new rule to satisfy %s" % msg
except Exception, err:
print "Exception: %s" %str(err)
transaction.rollback()
else:
print "Happy end"
transaction.rollback()
print '\n-------- DONE --------'