New abac test script

This commit is contained in:
Mikaël Ates 2011-09-02 18:47:05 +02:00
parent bda42675e0
commit 4abdb74719
1 changed files with 257 additions and 95 deletions

View File

@ -17,7 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from acs.abac.models import *
@ -25,12 +26,20 @@ from acs.abac.core import check_predicates, \
arrange_missing_predicates, get_source_form_name, \
get_def_from_name_and_ns, add_assertion_to_profile, \
make_new_rule_from_missing_predicates, \
get_attribute_definition_by_name, load_profile_by_dic
get_attribute_definition_by_name, load_profile_by_dic, \
check_predicate_role
from acs.abac.logic import evaluation, return_sorted_variables_to_truth
from acs.xacml.constants import *
from acs.models import Role, UserAlias, AcsObject, Action, AcsAbacPermission
from acs.core import create_policy, remove_policy, \
add_role, mod_role, add_object, add_action, add_permission, \
add_view, add_activity, mod_view, mod_activity, \
is_authorized_by_names_with_abac
class Command(BaseCommand):
'''
@ -293,85 +302,217 @@ class Command(BaseCommand):
p15.save()
print "Predicate test comp 15 %s, id: %s" % (p15, p15.id)
str_rule = "((%s&(%s|%s))|(%s&(%s|%s)))&(%s&%s&%s&%s&%s&%s&%s&%s&%s)" % (p1.id, p3.id, p4.id, p2.id, p5.id, p6.id, p7.id, p8.id, p9.id, p10.id, p11.id, p12.id, p13.id, p14.id, p15.id)
print '--> Create a user'
rdm_str = ''.join(random.choice(string.ascii_uppercase + \
string.digits) for x in range(8))
username = 'user_' + rdm_str
user, created = User.objects.get_or_create(username=username)
if not user:
raise CommandError('Error creating user %s' % user)
user.set_password(username)
user.save()
print "User %s created" % user
print '<--\n'
requester = User.objects.get(username='root')
print '--> Create one policy'
name = 'policy_' + rdm_str
namespace = 'namespace_' + rdm_str
policy, created = create_policy(name, requester,
namespace=namespace)
if not policy:
raise CommandError('Error creating policy %s' % policy)
print "Policy %s created" % policy
print '<--\n'
print '--> Set user in policy'
alias, created = UserAlias.objects.get_or_create(user=user,
alias=user.username, namespace=policy.namespace)
if not alias:
raise CommandError('Error creating alias %s' % alias)
print "Alias %s created" % alias
print '<--\n'
print '--> Create two roles'
role_name_1 = 'role_1_' + rdm_str
role_1 = add_role(requester, role_name_1, policy)
if not role_1:
raise CommandError('Unable to handle a role due to %s' \
% str(role_1))
print "Role %s created" % role_1
role_name_2 = 'role_2_' + rdm_str
role_2 = add_role(requester, role_name_2, policy)
if not role_2:
raise CommandError('Unable to handle a role due to %s' \
% str(role_2))
print "Role %s created" % role_2
print '<--\n'
print '--> Add role2 to role_1'
r = mod_role(requester, role_1, policy,
roles_added=[role_2])
if r < 0:
raise CommandError(\
'Unable to handle a role due to %s' % str(r))
print "Role %s added to %s" % (role_2, role_1)
print '<--\n'
print '--> Add user to the role_1'
r = mod_role(requester, role_1, policy,
users_added=[alias])
if r < 0:
raise CommandError(\
'Unable to handle a role due to %s' % str(r))
print "User %s added to %s" % (alias, role_1)
print '<--\n'
print '--> Create one predicateRole'
p16 = PredicateRole(role=role_2, rule=rule)
p16.save()
print "Predicate role 16 %s, id: %s" % (p16, p16.id)
str_rule = "(((%s&(%s|%s))|(%s&(%s|%s)))&(%s&%s&%s&%s&%s&%s&%s&%s&%s)|%s)" % (p1.id, p3.id, p4.id, p2.id, p5.id, p6.id, p7.id, p8.id, p9.id, p10.id, p11.id, p12.id, p13.id, p14.id, p15.id, p16.id)
rule.expression=str_rule
print "Added rule: %s" % rule
rule.save()
object_1_name = 'object_1_' + rdm_str
object_1 = add_object(requester, object_1_name, policy)
if not object_1:
raise CommandError('Unable to handle an object due to \
%s' % str(object_1))
object_2_name = 'object_2_' + rdm_str
object_2 = add_object(requester, object_2_name, policy)
if not object_2:
raise CommandError('Unable to handle an object due to \
%s' % str(object_1))
action_1_name = 'action_1_' + rdm_str
action_1 = add_action(requester, action_1_name, policy)
if not action_1:
raise CommandError('Unable to handle an action due to \
%s' % str(action_1))
action_2_name = 'action_2_' + rdm_str
action_2 = add_action(requester, action_2_name, policy)
if not action_2:
raise CommandError('Unable to handle an action due to \
%s' % str(action_1))
view_1_name = 'view_1_' + rdm_str
view_1 = add_view(requester, view_1_name, policy)
if not view_1:
raise CommandError('Unable to handle an view due to \
%s' % str(view_1))
view_2_name = 'view_2_' + rdm_str
view_2 = add_view(requester, view_2_name, policy)
if not view_2:
raise CommandError('Unable to handle an view due to \
%s' % str(view_2))
activity_1_name = 'activity_1_' + rdm_str
activity_1 = add_activity(requester, activity_1_name, policy)
if not activity_1:
raise CommandError('Unable to handle an activity due to \
%s' % str(activity_1))
activity_2_name = 'activity_2_' + rdm_str
activity_2 = add_activity(requester, activity_2_name, policy)
if not activity_1:
raise CommandError('Unable to handle an activity due to \
%s' % str(activity_2))
print '--> Add object 2 to view 2'
r = mod_view(requester, view_2, policy,
objects_added=[object_2])
if r < 0:
raise CommandError(\
'Unable to handle a view due to %s' % str(r))
print "Object %s added to %s" % (object_2, view_2)
print '<--\n'
print '--> Add object 1 and view 2 to view 1'
r = mod_view(requester, view_1, policy,
objects_added=[object_1], views_added=[view_2])
if r < 0:
raise CommandError(\
'Unable to handle a view due to %s' % str(r))
print "Object %s and view %s added to %s" % (object_1, view_2, view_1)
print '<--\n'
print '--> Add action 2 to activity 2'
r = mod_activity(requester, activity_2, policy,
actions_added=[action_2])
if r < 0:
raise CommandError(\
'Unable to handle an activity due to %s' % str(r))
print "Action %s added to %s" % (action_2, activity_2)
print '<--\n'
print '--> Add action 1 and activity 2 to view 1'
r = mod_activity(requester, activity_1, policy,
actions_added=[action_1], activities_added=[activity_2])
if r < 0:
raise CommandError(\
'Unable to handle an activity due to %s' % str(r))
print "Action %s and activity %s added to %s" % (action_1, activity_2, activity_1)
print '<--\n'
'''
Simulate the user session context
Add permission to policy
'''
AcsAbacPermission(who=None, what=view_2, how=activity_2, rule=rule).save()
'''
Here, read certificates
Create definitions and assertions on the fly and inject data in profile
Authentic will check that a certificate come from a trusted source
Only DIRECT trust source are supported for now.
The authentic will put in the session the source name
Then for each it will put in the session the attributes with name,
namespaces and values
0- From the certificat determine the corresponding source
Use Get source from name ()
1- From attribute name and namespace in certificates find the corresponding definition
Use Get definition from identifier and namespace ()
2- Create assertion coupling both
3- Create data with values and inject in profile
Request policy
'''
session = {}
session['certificates'] = {}
data = {}
data['attributes'] = []
attributes = {}
data = []
attr = {}
attr['name'] = 'certificate_type'
attr['namespace'] = 'ACS-ABAC'
attr['values'] = ('eID',)
data['attributes'].append(attr)
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('22', )
data['attributes'].append(attr)
attr['values'] = ('17', )
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Ates',)
data['attributes'].append(attr)
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Mikael', 'Ersin',)
data['attributes'].append(attr)
data.append(attr)
attr = {}
attr['name'] = 'Nationality'
attr['namespace'] = 'ISO7501-1'
attr['values'] = ('FRA',)
data['attributes'].append(attr)
session['certificates']['IdP1'] = data
data.append(attr)
attributes['IdP1'] = data
data = {}
data['attributes'] = []
data = []
attr = {}
attr['name'] = 'certificate_type'
attr['namespace'] = 'ACS-ABAC'
attr['values'] = ('driver_licence',)
data['attributes'].append(attr)
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Ates',)
data['attributes'].append(attr)
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Mikael', 'Ersin',)
data['attributes'].append(attr)
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/privatepersonalidentifier'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('31q3sdf1q3sdf213q2d1f5qs',)
data['attributes'].append(attr)
data.append(attr)
# attr = {}
# attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/privatepersonalidentifier'
# attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
@ -381,82 +522,103 @@ class Command(BaseCommand):
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('17', )
data['attributes'].append(attr)
session['certificates']['IdP2'] = data
data.append(attr)
attributes['IdP2'] = data
data = {}
data['attributes'] = []
data = []
attr = {}
attr['name'] = 'certificate_type'
attr['namespace'] = 'ACS-ABAC'
attr['values'] = ('assurance_assessment',)
data['attributes'].append(attr)
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Ates',)
data['attributes'].append(attr)
data.append(attr)
attr = {}
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
attr['values'] = ('Mikael', 'Ersin', 'Bob')
data['attributes'].append(attr)
session['certificates']['IdP3'] = data
print "Found in session: %s" % session['certificates']
data.append(attr)
attributes['IdP3'] = data
profile = UserAttributeProfile()
profile.save()
decision, msg, error = \
is_authorized_by_names_with_abac(requestor_name=user.username,
who_name=user.username,
what_name=object_1_name,
how_name=action_1_name,
namespace_name=namespace,
attributes=attributes,
no_rule_returned=True)
if error < 0:
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
if decision:
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
print "Access granted by permission %s" % msg
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
else:
print "------------------------------------------------------"
print "Access denied, new rule to satisfy %s" % msg
print "------------------------------------------------------"
load_profile_by_dic(profile, session['certificates'])
decision, msg, error = \
is_authorized_by_names_with_abac(requestor_name=user.username,
who_name=user.username,
what_name=view_1_name, view=True,
how_name=activity_1_name, activity = True,
namespace_name=namespace,
attributes=attributes,
no_rule_returned=True)
if error < 0:
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
if decision:
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
print "Access granted by permission %s" % msg
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
else:
print "------------------------------------------------------"
print "Access denied, new rule to satisfy %s" % msg
print "------------------------------------------------------"
print "The profile contains:"
ads = AssertionData.objects.filter(profile=profile)
for ad in ads:
attribute_data = ad.attribute_data
print "From %s, definition %s" % (ad.source, attribute_data.definition.id)
if attribute_data.definition.attribute_type == ACS_XACML_DATATYPE_STRING:
values = StringM.objects.filter(data=attribute_data)
print "with values %s" % str([x.value for x in values])
elif attribute_data.definition.attribute_type == ACS_XACML_DATATYPE_INTEGER:
values = IntegerM.objects.filter(data=attribute_data)
print "with values %s" % str([x.value for x in values])
decision, msg, error = \
is_authorized_by_names_with_abac(requestor_name=user.username,
who_name=user.username,
what_name=object_2_name,
how_name=action_2_name,
namespace_name=namespace,
attributes=attributes,
no_rule_returned=True)
if error < 0:
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
if decision:
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
print "Access granted by permission %s" % msg
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
else:
print "------------------------------------------------------"
print "Access denied, new rule to satisfy %s" % msg
print "------------------------------------------------------"
decision, msg, error = \
is_authorized_by_names_with_abac(requestor_name=user.username,
who_name=user.username,
what_name=view_2_name, view=True,
how_name=activity_2_name, activity=True,
namespace_name=namespace,
attributes=attributes)
if error < 0:
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
if decision:
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
print "Access granted by permission %s" % msg
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
else:
print "------------------------------------------------------"
print "Access denied, new rule to satisfy %s" % msg
print "------------------------------------------------------"
'''
Check presence, check source, check value
List all predicate and make a dictionnary with the id as key
and True or False as value
'''
l = check_predicates(rule, profile)
print "Lets test %s with %s" % (rule.expression, l)
res = evaluation(rule.expression, l)
print "Authorized? %s" % res
if not res:
missing_p = return_sorted_variables_to_truth(rule.expression, l)
print "Missing: %s" % missing_p
new_rule = make_new_rule_from_missing_predicates(missing_p)
print "New rule: %s" % new_rule
# missing_p = arrange_missing_predicates(missing_p, profile)
# print "predicates arranged %s" % missing_p
rule.expression=new_rule
#Add an assertion to the profile
s = get_source_form_name('IdP1')
d = get_def_from_name_and_ns('http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth', 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims')
add_assertion_to_profile(profile, s, d, ('19',))
#check once again
l = check_predicates(rule, profile)
print "Lets test %s with %s" % (rule.expression, l)
res = evaluation(rule.expression, l)
print "Authorized? %s" % res
if not res:
missing_p = return_sorted_variables_to_truth(rule.expression, l)
print "Missing: %s" % missing_p
new_rule = make_new_rule_from_missing_predicates(missing_p)
print "New rule: %s" % new_rule
raise Exception('END')
raise Exception('Happy end')
except Exception, err:
print "Exception: %s" %str(err)