New abac test script
This commit is contained in:
parent
bda42675e0
commit
4abdb74719
|
@ -17,7 +17,8 @@
|
|||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
'''
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.db import transaction
|
||||
|
||||
from acs.abac.models import *
|
||||
|
@ -25,12 +26,20 @@ from acs.abac.core import check_predicates, \
|
|||
arrange_missing_predicates, get_source_form_name, \
|
||||
get_def_from_name_and_ns, add_assertion_to_profile, \
|
||||
make_new_rule_from_missing_predicates, \
|
||||
get_attribute_definition_by_name, load_profile_by_dic
|
||||
get_attribute_definition_by_name, load_profile_by_dic, \
|
||||
check_predicate_role
|
||||
|
||||
from acs.abac.logic import evaluation, return_sorted_variables_to_truth
|
||||
|
||||
from acs.xacml.constants import *
|
||||
|
||||
from acs.models import Role, UserAlias, AcsObject, Action, AcsAbacPermission
|
||||
|
||||
from acs.core import create_policy, remove_policy, \
|
||||
add_role, mod_role, add_object, add_action, add_permission, \
|
||||
add_view, add_activity, mod_view, mod_activity, \
|
||||
is_authorized_by_names_with_abac
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
'''
|
||||
|
@ -293,85 +302,217 @@ class Command(BaseCommand):
|
|||
p15.save()
|
||||
print "Predicate test comp 15 %s, id: %s" % (p15, p15.id)
|
||||
|
||||
str_rule = "((%s&(%s|%s))|(%s&(%s|%s)))&(%s&%s&%s&%s&%s&%s&%s&%s&%s)" % (p1.id, p3.id, p4.id, p2.id, p5.id, p6.id, p7.id, p8.id, p9.id, p10.id, p11.id, p12.id, p13.id, p14.id, p15.id)
|
||||
print '--> Create a user'
|
||||
rdm_str = ''.join(random.choice(string.ascii_uppercase + \
|
||||
string.digits) for x in range(8))
|
||||
username = 'user_' + rdm_str
|
||||
user, created = User.objects.get_or_create(username=username)
|
||||
if not user:
|
||||
raise CommandError('Error creating user %s' % user)
|
||||
user.set_password(username)
|
||||
user.save()
|
||||
print "User %s created" % user
|
||||
print '<--\n'
|
||||
|
||||
requester = User.objects.get(username='root')
|
||||
|
||||
print '--> Create one policy'
|
||||
name = 'policy_' + rdm_str
|
||||
namespace = 'namespace_' + rdm_str
|
||||
policy, created = create_policy(name, requester,
|
||||
namespace=namespace)
|
||||
if not policy:
|
||||
raise CommandError('Error creating policy %s' % policy)
|
||||
print "Policy %s created" % policy
|
||||
print '<--\n'
|
||||
|
||||
print '--> Set user in policy'
|
||||
alias, created = UserAlias.objects.get_or_create(user=user,
|
||||
alias=user.username, namespace=policy.namespace)
|
||||
if not alias:
|
||||
raise CommandError('Error creating alias %s' % alias)
|
||||
print "Alias %s created" % alias
|
||||
print '<--\n'
|
||||
|
||||
print '--> Create two roles'
|
||||
role_name_1 = 'role_1_' + rdm_str
|
||||
role_1 = add_role(requester, role_name_1, policy)
|
||||
if not role_1:
|
||||
raise CommandError('Unable to handle a role due to %s' \
|
||||
% str(role_1))
|
||||
print "Role %s created" % role_1
|
||||
role_name_2 = 'role_2_' + rdm_str
|
||||
role_2 = add_role(requester, role_name_2, policy)
|
||||
if not role_2:
|
||||
raise CommandError('Unable to handle a role due to %s' \
|
||||
% str(role_2))
|
||||
print "Role %s created" % role_2
|
||||
print '<--\n'
|
||||
|
||||
print '--> Add role2 to role_1'
|
||||
r = mod_role(requester, role_1, policy,
|
||||
roles_added=[role_2])
|
||||
if r < 0:
|
||||
raise CommandError(\
|
||||
'Unable to handle a role due to %s' % str(r))
|
||||
print "Role %s added to %s" % (role_2, role_1)
|
||||
print '<--\n'
|
||||
|
||||
print '--> Add user to the role_1'
|
||||
r = mod_role(requester, role_1, policy,
|
||||
users_added=[alias])
|
||||
if r < 0:
|
||||
raise CommandError(\
|
||||
'Unable to handle a role due to %s' % str(r))
|
||||
print "User %s added to %s" % (alias, role_1)
|
||||
print '<--\n'
|
||||
|
||||
print '--> Create one predicateRole'
|
||||
p16 = PredicateRole(role=role_2, rule=rule)
|
||||
p16.save()
|
||||
print "Predicate role 16 %s, id: %s" % (p16, p16.id)
|
||||
|
||||
str_rule = "(((%s&(%s|%s))|(%s&(%s|%s)))&(%s&%s&%s&%s&%s&%s&%s&%s&%s)|%s)" % (p1.id, p3.id, p4.id, p2.id, p5.id, p6.id, p7.id, p8.id, p9.id, p10.id, p11.id, p12.id, p13.id, p14.id, p15.id, p16.id)
|
||||
rule.expression=str_rule
|
||||
print "Added rule: %s" % rule
|
||||
rule.save()
|
||||
|
||||
object_1_name = 'object_1_' + rdm_str
|
||||
object_1 = add_object(requester, object_1_name, policy)
|
||||
if not object_1:
|
||||
raise CommandError('Unable to handle an object due to \
|
||||
%s' % str(object_1))
|
||||
object_2_name = 'object_2_' + rdm_str
|
||||
object_2 = add_object(requester, object_2_name, policy)
|
||||
if not object_2:
|
||||
raise CommandError('Unable to handle an object due to \
|
||||
%s' % str(object_1))
|
||||
|
||||
action_1_name = 'action_1_' + rdm_str
|
||||
action_1 = add_action(requester, action_1_name, policy)
|
||||
if not action_1:
|
||||
raise CommandError('Unable to handle an action due to \
|
||||
%s' % str(action_1))
|
||||
action_2_name = 'action_2_' + rdm_str
|
||||
action_2 = add_action(requester, action_2_name, policy)
|
||||
if not action_2:
|
||||
raise CommandError('Unable to handle an action due to \
|
||||
%s' % str(action_1))
|
||||
|
||||
view_1_name = 'view_1_' + rdm_str
|
||||
view_1 = add_view(requester, view_1_name, policy)
|
||||
if not view_1:
|
||||
raise CommandError('Unable to handle an view due to \
|
||||
%s' % str(view_1))
|
||||
view_2_name = 'view_2_' + rdm_str
|
||||
view_2 = add_view(requester, view_2_name, policy)
|
||||
if not view_2:
|
||||
raise CommandError('Unable to handle an view due to \
|
||||
%s' % str(view_2))
|
||||
|
||||
activity_1_name = 'activity_1_' + rdm_str
|
||||
activity_1 = add_activity(requester, activity_1_name, policy)
|
||||
if not activity_1:
|
||||
raise CommandError('Unable to handle an activity due to \
|
||||
%s' % str(activity_1))
|
||||
activity_2_name = 'activity_2_' + rdm_str
|
||||
activity_2 = add_activity(requester, activity_2_name, policy)
|
||||
if not activity_1:
|
||||
raise CommandError('Unable to handle an activity due to \
|
||||
%s' % str(activity_2))
|
||||
|
||||
print '--> Add object 2 to view 2'
|
||||
r = mod_view(requester, view_2, policy,
|
||||
objects_added=[object_2])
|
||||
if r < 0:
|
||||
raise CommandError(\
|
||||
'Unable to handle a view due to %s' % str(r))
|
||||
print "Object %s added to %s" % (object_2, view_2)
|
||||
print '<--\n'
|
||||
print '--> Add object 1 and view 2 to view 1'
|
||||
r = mod_view(requester, view_1, policy,
|
||||
objects_added=[object_1], views_added=[view_2])
|
||||
if r < 0:
|
||||
raise CommandError(\
|
||||
'Unable to handle a view due to %s' % str(r))
|
||||
print "Object %s and view %s added to %s" % (object_1, view_2, view_1)
|
||||
print '<--\n'
|
||||
|
||||
print '--> Add action 2 to activity 2'
|
||||
r = mod_activity(requester, activity_2, policy,
|
||||
actions_added=[action_2])
|
||||
if r < 0:
|
||||
raise CommandError(\
|
||||
'Unable to handle an activity due to %s' % str(r))
|
||||
print "Action %s added to %s" % (action_2, activity_2)
|
||||
print '<--\n'
|
||||
print '--> Add action 1 and activity 2 to view 1'
|
||||
r = mod_activity(requester, activity_1, policy,
|
||||
actions_added=[action_1], activities_added=[activity_2])
|
||||
if r < 0:
|
||||
raise CommandError(\
|
||||
'Unable to handle an activity due to %s' % str(r))
|
||||
print "Action %s and activity %s added to %s" % (action_1, activity_2, activity_1)
|
||||
print '<--\n'
|
||||
|
||||
'''
|
||||
Simulate the user session context
|
||||
Add permission to policy
|
||||
'''
|
||||
AcsAbacPermission(who=None, what=view_2, how=activity_2, rule=rule).save()
|
||||
|
||||
'''
|
||||
Here, read certificates
|
||||
Create definitions and assertions on the fly and inject data in profile
|
||||
|
||||
Authentic will check that a certificate come from a trusted source
|
||||
Only DIRECT trust source are supported for now.
|
||||
The authentic will put in the session the source name
|
||||
Then for each it will put in the session the attributes with name,
|
||||
namespaces and values
|
||||
|
||||
0- From the certificat determine the corresponding source
|
||||
Use Get source from name ()
|
||||
1- From attribute name and namespace in certificates find the corresponding definition
|
||||
Use Get definition from identifier and namespace ()
|
||||
2- Create assertion coupling both
|
||||
3- Create data with values and inject in profile
|
||||
Request policy
|
||||
'''
|
||||
|
||||
session = {}
|
||||
session['certificates'] = {}
|
||||
data = {}
|
||||
data['attributes'] = []
|
||||
attributes = {}
|
||||
data = []
|
||||
attr = {}
|
||||
attr['name'] = 'certificate_type'
|
||||
attr['namespace'] = 'ACS-ABAC'
|
||||
attr['values'] = ('eID',)
|
||||
data['attributes'].append(attr)
|
||||
data.append(attr)
|
||||
attr = {}
|
||||
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth'
|
||||
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
attr['values'] = ('22', )
|
||||
data['attributes'].append(attr)
|
||||
attr['values'] = ('17', )
|
||||
data.append(attr)
|
||||
attr = {}
|
||||
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
|
||||
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
attr['values'] = ('Ates',)
|
||||
data['attributes'].append(attr)
|
||||
data.append(attr)
|
||||
attr = {}
|
||||
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
|
||||
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
attr['values'] = ('Mikael', 'Ersin',)
|
||||
data['attributes'].append(attr)
|
||||
data.append(attr)
|
||||
attr = {}
|
||||
attr['name'] = 'Nationality'
|
||||
attr['namespace'] = 'ISO7501-1'
|
||||
attr['values'] = ('FRA',)
|
||||
data['attributes'].append(attr)
|
||||
session['certificates']['IdP1'] = data
|
||||
data.append(attr)
|
||||
attributes['IdP1'] = data
|
||||
|
||||
data = {}
|
||||
data['attributes'] = []
|
||||
data = []
|
||||
attr = {}
|
||||
attr['name'] = 'certificate_type'
|
||||
attr['namespace'] = 'ACS-ABAC'
|
||||
attr['values'] = ('driver_licence',)
|
||||
data['attributes'].append(attr)
|
||||
data.append(attr)
|
||||
attr = {}
|
||||
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
|
||||
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
attr['values'] = ('Ates',)
|
||||
data['attributes'].append(attr)
|
||||
data.append(attr)
|
||||
attr = {}
|
||||
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
|
||||
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
attr['values'] = ('Mikael', 'Ersin',)
|
||||
data['attributes'].append(attr)
|
||||
data.append(attr)
|
||||
attr = {}
|
||||
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/privatepersonalidentifier'
|
||||
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
attr['values'] = ('31q3sdf1q3sdf213q2d1f5qs',)
|
||||
data['attributes'].append(attr)
|
||||
data.append(attr)
|
||||
# attr = {}
|
||||
# attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/privatepersonalidentifier'
|
||||
# attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
|
@ -381,82 +522,103 @@ class Command(BaseCommand):
|
|||
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth'
|
||||
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
attr['values'] = ('17', )
|
||||
data['attributes'].append(attr)
|
||||
session['certificates']['IdP2'] = data
|
||||
data.append(attr)
|
||||
attributes['IdP2'] = data
|
||||
|
||||
data = {}
|
||||
data['attributes'] = []
|
||||
data = []
|
||||
attr = {}
|
||||
attr['name'] = 'certificate_type'
|
||||
attr['namespace'] = 'ACS-ABAC'
|
||||
attr['values'] = ('assurance_assessment',)
|
||||
data['attributes'].append(attr)
|
||||
data.append(attr)
|
||||
attr = {}
|
||||
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname'
|
||||
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
attr['values'] = ('Ates',)
|
||||
data['attributes'].append(attr)
|
||||
data.append(attr)
|
||||
attr = {}
|
||||
attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname'
|
||||
attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims'
|
||||
attr['values'] = ('Mikael', 'Ersin', 'Bob')
|
||||
data['attributes'].append(attr)
|
||||
session['certificates']['IdP3'] = data
|
||||
print "Found in session: %s" % session['certificates']
|
||||
data.append(attr)
|
||||
attributes['IdP3'] = data
|
||||
|
||||
profile = UserAttributeProfile()
|
||||
profile.save()
|
||||
decision, msg, error = \
|
||||
is_authorized_by_names_with_abac(requestor_name=user.username,
|
||||
who_name=user.username,
|
||||
what_name=object_1_name,
|
||||
how_name=action_1_name,
|
||||
namespace_name=namespace,
|
||||
attributes=attributes,
|
||||
no_rule_returned=True)
|
||||
if error < 0:
|
||||
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
|
||||
if decision:
|
||||
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
|
||||
print "Access granted by permission %s" % msg
|
||||
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
|
||||
else:
|
||||
print "------------------------------------------------------"
|
||||
print "Access denied, new rule to satisfy %s" % msg
|
||||
print "------------------------------------------------------"
|
||||
|
||||
load_profile_by_dic(profile, session['certificates'])
|
||||
decision, msg, error = \
|
||||
is_authorized_by_names_with_abac(requestor_name=user.username,
|
||||
who_name=user.username,
|
||||
what_name=view_1_name, view=True,
|
||||
how_name=activity_1_name, activity = True,
|
||||
namespace_name=namespace,
|
||||
attributes=attributes,
|
||||
no_rule_returned=True)
|
||||
if error < 0:
|
||||
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
|
||||
if decision:
|
||||
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
|
||||
print "Access granted by permission %s" % msg
|
||||
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
|
||||
else:
|
||||
print "------------------------------------------------------"
|
||||
print "Access denied, new rule to satisfy %s" % msg
|
||||
print "------------------------------------------------------"
|
||||
|
||||
print "The profile contains:"
|
||||
ads = AssertionData.objects.filter(profile=profile)
|
||||
for ad in ads:
|
||||
attribute_data = ad.attribute_data
|
||||
print "From %s, definition %s" % (ad.source, attribute_data.definition.id)
|
||||
if attribute_data.definition.attribute_type == ACS_XACML_DATATYPE_STRING:
|
||||
values = StringM.objects.filter(data=attribute_data)
|
||||
print "with values %s" % str([x.value for x in values])
|
||||
elif attribute_data.definition.attribute_type == ACS_XACML_DATATYPE_INTEGER:
|
||||
values = IntegerM.objects.filter(data=attribute_data)
|
||||
print "with values %s" % str([x.value for x in values])
|
||||
decision, msg, error = \
|
||||
is_authorized_by_names_with_abac(requestor_name=user.username,
|
||||
who_name=user.username,
|
||||
what_name=object_2_name,
|
||||
how_name=action_2_name,
|
||||
namespace_name=namespace,
|
||||
attributes=attributes,
|
||||
no_rule_returned=True)
|
||||
if error < 0:
|
||||
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
|
||||
if decision:
|
||||
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
|
||||
print "Access granted by permission %s" % msg
|
||||
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
|
||||
else:
|
||||
print "------------------------------------------------------"
|
||||
print "Access denied, new rule to satisfy %s" % msg
|
||||
print "------------------------------------------------------"
|
||||
|
||||
decision, msg, error = \
|
||||
is_authorized_by_names_with_abac(requestor_name=user.username,
|
||||
who_name=user.username,
|
||||
what_name=view_2_name, view=True,
|
||||
how_name=activity_2_name, activity=True,
|
||||
namespace_name=namespace,
|
||||
attributes=attributes)
|
||||
if error < 0:
|
||||
raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error))
|
||||
if decision:
|
||||
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
|
||||
print "Access granted by permission %s" % msg
|
||||
print "++++++++++++++++++++++++++++++++++++++++++++++++++++++"
|
||||
else:
|
||||
print "------------------------------------------------------"
|
||||
print "Access denied, new rule to satisfy %s" % msg
|
||||
print "------------------------------------------------------"
|
||||
|
||||
'''
|
||||
Check presence, check source, check value
|
||||
List all predicate and make a dictionnary with the id as key
|
||||
and True or False as value
|
||||
'''
|
||||
l = check_predicates(rule, profile)
|
||||
print "Lets test %s with %s" % (rule.expression, l)
|
||||
res = evaluation(rule.expression, l)
|
||||
print "Authorized? %s" % res
|
||||
if not res:
|
||||
missing_p = return_sorted_variables_to_truth(rule.expression, l)
|
||||
print "Missing: %s" % missing_p
|
||||
new_rule = make_new_rule_from_missing_predicates(missing_p)
|
||||
print "New rule: %s" % new_rule
|
||||
# missing_p = arrange_missing_predicates(missing_p, profile)
|
||||
# print "predicates arranged %s" % missing_p
|
||||
rule.expression=new_rule
|
||||
|
||||
#Add an assertion to the profile
|
||||
s = get_source_form_name('IdP1')
|
||||
d = get_def_from_name_and_ns('http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth', 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims')
|
||||
add_assertion_to_profile(profile, s, d, ('19',))
|
||||
|
||||
#check once again
|
||||
l = check_predicates(rule, profile)
|
||||
print "Lets test %s with %s" % (rule.expression, l)
|
||||
res = evaluation(rule.expression, l)
|
||||
print "Authorized? %s" % res
|
||||
if not res:
|
||||
missing_p = return_sorted_variables_to_truth(rule.expression, l)
|
||||
print "Missing: %s" % missing_p
|
||||
new_rule = make_new_rule_from_missing_predicates(missing_p)
|
||||
print "New rule: %s" % new_rule
|
||||
|
||||
raise Exception('END')
|
||||
raise Exception('Happy end')
|
||||
|
||||
except Exception, err:
|
||||
print "Exception: %s" %str(err)
|
||||
|
|
Reference in New Issue