diff --git a/acs/management/commands/test-abac.py b/acs/management/commands/test-abac.py index 4a2b054..36c0118 100644 --- a/acs/management/commands/test-abac.py +++ b/acs/management/commands/test-abac.py @@ -17,7 +17,8 @@ along with this program. If not, see . ''' -from django.core.management.base import BaseCommand + +from django.core.management.base import BaseCommand, CommandError from django.db import transaction from acs.abac.models import * @@ -25,12 +26,20 @@ from acs.abac.core import check_predicates, \ arrange_missing_predicates, get_source_form_name, \ get_def_from_name_and_ns, add_assertion_to_profile, \ make_new_rule_from_missing_predicates, \ - get_attribute_definition_by_name, load_profile_by_dic + get_attribute_definition_by_name, load_profile_by_dic, \ + check_predicate_role from acs.abac.logic import evaluation, return_sorted_variables_to_truth from acs.xacml.constants import * +from acs.models import Role, UserAlias, AcsObject, Action, AcsAbacPermission + +from acs.core import create_policy, remove_policy, \ + add_role, mod_role, add_object, add_action, add_permission, \ + add_view, add_activity, mod_view, mod_activity, \ + is_authorized_by_names_with_abac + class Command(BaseCommand): ''' @@ -293,85 +302,217 @@ class Command(BaseCommand): p15.save() print "Predicate test comp 15 %s, id: %s" % (p15, p15.id) - str_rule = "((%s&(%s|%s))|(%s&(%s|%s)))&(%s&%s&%s&%s&%s&%s&%s&%s&%s)" % (p1.id, p3.id, p4.id, p2.id, p5.id, p6.id, p7.id, p8.id, p9.id, p10.id, p11.id, p12.id, p13.id, p14.id, p15.id) + print '--> Create a user' + rdm_str = ''.join(random.choice(string.ascii_uppercase + \ + string.digits) for x in range(8)) + username = 'user_' + rdm_str + user, created = User.objects.get_or_create(username=username) + if not user: + raise CommandError('Error creating user %s' % user) + user.set_password(username) + user.save() + print "User %s created" % user + print '<--\n' + + requester = User.objects.get(username='root') + + print '--> Create one policy' + name = 'policy_' + rdm_str + namespace = 'namespace_' + rdm_str + policy, created = create_policy(name, requester, + namespace=namespace) + if not policy: + raise CommandError('Error creating policy %s' % policy) + print "Policy %s created" % policy + print '<--\n' + + print '--> Set user in policy' + alias, created = UserAlias.objects.get_or_create(user=user, + alias=user.username, namespace=policy.namespace) + if not alias: + raise CommandError('Error creating alias %s' % alias) + print "Alias %s created" % alias + print '<--\n' + + print '--> Create two roles' + role_name_1 = 'role_1_' + rdm_str + role_1 = add_role(requester, role_name_1, policy) + if not role_1: + raise CommandError('Unable to handle a role due to %s' \ + % str(role_1)) + print "Role %s created" % role_1 + role_name_2 = 'role_2_' + rdm_str + role_2 = add_role(requester, role_name_2, policy) + if not role_2: + raise CommandError('Unable to handle a role due to %s' \ + % str(role_2)) + print "Role %s created" % role_2 + print '<--\n' + + print '--> Add role2 to role_1' + r = mod_role(requester, role_1, policy, + roles_added=[role_2]) + if r < 0: + raise CommandError(\ + 'Unable to handle a role due to %s' % str(r)) + print "Role %s added to %s" % (role_2, role_1) + print '<--\n' + + print '--> Add user to the role_1' + r = mod_role(requester, role_1, policy, + users_added=[alias]) + if r < 0: + raise CommandError(\ + 'Unable to handle a role due to %s' % str(r)) + print "User %s added to %s" % (alias, role_1) + print '<--\n' + + print '--> Create one predicateRole' + p16 = PredicateRole(role=role_2, rule=rule) + p16.save() + print "Predicate role 16 %s, id: %s" % (p16, p16.id) + + str_rule = "(((%s&(%s|%s))|(%s&(%s|%s)))&(%s&%s&%s&%s&%s&%s&%s&%s&%s)|%s)" % (p1.id, p3.id, p4.id, p2.id, p5.id, p6.id, p7.id, p8.id, p9.id, p10.id, p11.id, p12.id, p13.id, p14.id, p15.id, p16.id) rule.expression=str_rule - print "Added rule: %s" % rule + rule.save() + + object_1_name = 'object_1_' + rdm_str + object_1 = add_object(requester, object_1_name, policy) + if not object_1: + raise CommandError('Unable to handle an object due to \ + %s' % str(object_1)) + object_2_name = 'object_2_' + rdm_str + object_2 = add_object(requester, object_2_name, policy) + if not object_2: + raise CommandError('Unable to handle an object due to \ + %s' % str(object_1)) + + action_1_name = 'action_1_' + rdm_str + action_1 = add_action(requester, action_1_name, policy) + if not action_1: + raise CommandError('Unable to handle an action due to \ + %s' % str(action_1)) + action_2_name = 'action_2_' + rdm_str + action_2 = add_action(requester, action_2_name, policy) + if not action_2: + raise CommandError('Unable to handle an action due to \ + %s' % str(action_1)) + + view_1_name = 'view_1_' + rdm_str + view_1 = add_view(requester, view_1_name, policy) + if not view_1: + raise CommandError('Unable to handle an view due to \ + %s' % str(view_1)) + view_2_name = 'view_2_' + rdm_str + view_2 = add_view(requester, view_2_name, policy) + if not view_2: + raise CommandError('Unable to handle an view due to \ + %s' % str(view_2)) + + activity_1_name = 'activity_1_' + rdm_str + activity_1 = add_activity(requester, activity_1_name, policy) + if not activity_1: + raise CommandError('Unable to handle an activity due to \ + %s' % str(activity_1)) + activity_2_name = 'activity_2_' + rdm_str + activity_2 = add_activity(requester, activity_2_name, policy) + if not activity_1: + raise CommandError('Unable to handle an activity due to \ + %s' % str(activity_2)) + + print '--> Add object 2 to view 2' + r = mod_view(requester, view_2, policy, + objects_added=[object_2]) + if r < 0: + raise CommandError(\ + 'Unable to handle a view due to %s' % str(r)) + print "Object %s added to %s" % (object_2, view_2) + print '<--\n' + print '--> Add object 1 and view 2 to view 1' + r = mod_view(requester, view_1, policy, + objects_added=[object_1], views_added=[view_2]) + if r < 0: + raise CommandError(\ + 'Unable to handle a view due to %s' % str(r)) + print "Object %s and view %s added to %s" % (object_1, view_2, view_1) + print '<--\n' + + print '--> Add action 2 to activity 2' + r = mod_activity(requester, activity_2, policy, + actions_added=[action_2]) + if r < 0: + raise CommandError(\ + 'Unable to handle an activity due to %s' % str(r)) + print "Action %s added to %s" % (action_2, activity_2) + print '<--\n' + print '--> Add action 1 and activity 2 to view 1' + r = mod_activity(requester, activity_1, policy, + actions_added=[action_1], activities_added=[activity_2]) + if r < 0: + raise CommandError(\ + 'Unable to handle an activity due to %s' % str(r)) + print "Action %s and activity %s added to %s" % (action_1, activity_2, activity_1) + print '<--\n' ''' - Simulate the user session context + Add permission to policy ''' + AcsAbacPermission(who=None, what=view_2, how=activity_2, rule=rule).save() ''' - Here, read certificates - Create definitions and assertions on the fly and inject data in profile - - Authentic will check that a certificate come from a trusted source - Only DIRECT trust source are supported for now. - The authentic will put in the session the source name - Then for each it will put in the session the attributes with name, - namespaces and values - - 0- From the certificat determine the corresponding source - Use Get source from name () - 1- From attribute name and namespace in certificates find the corresponding definition - Use Get definition from identifier and namespace () - 2- Create assertion coupling both - 3- Create data with values and inject in profile + Request policy ''' - session = {} - session['certificates'] = {} - data = {} - data['attributes'] = [] + attributes = {} + data = [] attr = {} attr['name'] = 'certificate_type' attr['namespace'] = 'ACS-ABAC' attr['values'] = ('eID',) - data['attributes'].append(attr) + data.append(attr) attr = {} attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth' attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' - attr['values'] = ('22', ) - data['attributes'].append(attr) + attr['values'] = ('17', ) + data.append(attr) attr = {} attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname' attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' attr['values'] = ('Ates',) - data['attributes'].append(attr) + data.append(attr) attr = {} attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname' attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' attr['values'] = ('Mikael', 'Ersin',) - data['attributes'].append(attr) + data.append(attr) attr = {} attr['name'] = 'Nationality' attr['namespace'] = 'ISO7501-1' attr['values'] = ('FRA',) - data['attributes'].append(attr) - session['certificates']['IdP1'] = data + data.append(attr) + attributes['IdP1'] = data - data = {} - data['attributes'] = [] + data = [] attr = {} attr['name'] = 'certificate_type' attr['namespace'] = 'ACS-ABAC' attr['values'] = ('driver_licence',) - data['attributes'].append(attr) + data.append(attr) attr = {} attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname' attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' attr['values'] = ('Ates',) - data['attributes'].append(attr) + data.append(attr) attr = {} attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname' attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' attr['values'] = ('Mikael', 'Ersin',) - data['attributes'].append(attr) + data.append(attr) attr = {} attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/privatepersonalidentifier' attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' attr['values'] = ('31q3sdf1q3sdf213q2d1f5qs',) - data['attributes'].append(attr) + data.append(attr) # attr = {} # attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/privatepersonalidentifier' # attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' @@ -381,82 +522,103 @@ class Command(BaseCommand): attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth' attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' attr['values'] = ('17', ) - data['attributes'].append(attr) - session['certificates']['IdP2'] = data + data.append(attr) + attributes['IdP2'] = data - data = {} - data['attributes'] = [] + data = [] attr = {} attr['name'] = 'certificate_type' attr['namespace'] = 'ACS-ABAC' attr['values'] = ('assurance_assessment',) - data['attributes'].append(attr) + data.append(attr) attr = {} attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname' attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' attr['values'] = ('Ates',) - data['attributes'].append(attr) + data.append(attr) attr = {} attr['name'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname' attr['namespace'] = 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims' attr['values'] = ('Mikael', 'Ersin', 'Bob') - data['attributes'].append(attr) - session['certificates']['IdP3'] = data - print "Found in session: %s" % session['certificates'] + data.append(attr) + attributes['IdP3'] = data - profile = UserAttributeProfile() - profile.save() + decision, msg, error = \ + is_authorized_by_names_with_abac(requestor_name=user.username, + who_name=user.username, + what_name=object_1_name, + how_name=action_1_name, + namespace_name=namespace, + attributes=attributes, + no_rule_returned=True) + if error < 0: + raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error)) + if decision: + print "++++++++++++++++++++++++++++++++++++++++++++++++++++++" + print "Access granted by permission %s" % msg + print "++++++++++++++++++++++++++++++++++++++++++++++++++++++" + else: + print "------------------------------------------------------" + print "Access denied, new rule to satisfy %s" % msg + print "------------------------------------------------------" - load_profile_by_dic(profile, session['certificates']) + decision, msg, error = \ + is_authorized_by_names_with_abac(requestor_name=user.username, + who_name=user.username, + what_name=view_1_name, view=True, + how_name=activity_1_name, activity = True, + namespace_name=namespace, + attributes=attributes, + no_rule_returned=True) + if error < 0: + raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error)) + if decision: + print "++++++++++++++++++++++++++++++++++++++++++++++++++++++" + print "Access granted by permission %s" % msg + print "++++++++++++++++++++++++++++++++++++++++++++++++++++++" + else: + print "------------------------------------------------------" + print "Access denied, new rule to satisfy %s" % msg + print "------------------------------------------------------" - print "The profile contains:" - ads = AssertionData.objects.filter(profile=profile) - for ad in ads: - attribute_data = ad.attribute_data - print "From %s, definition %s" % (ad.source, attribute_data.definition.id) - if attribute_data.definition.attribute_type == ACS_XACML_DATATYPE_STRING: - values = StringM.objects.filter(data=attribute_data) - print "with values %s" % str([x.value for x in values]) - elif attribute_data.definition.attribute_type == ACS_XACML_DATATYPE_INTEGER: - values = IntegerM.objects.filter(data=attribute_data) - print "with values %s" % str([x.value for x in values]) + decision, msg, error = \ + is_authorized_by_names_with_abac(requestor_name=user.username, + who_name=user.username, + what_name=object_2_name, + how_name=action_2_name, + namespace_name=namespace, + attributes=attributes, + no_rule_returned=True) + if error < 0: + raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error)) + if decision: + print "++++++++++++++++++++++++++++++++++++++++++++++++++++++" + print "Access granted by permission %s" % msg + print "++++++++++++++++++++++++++++++++++++++++++++++++++++++" + else: + print "------------------------------------------------------" + print "Access denied, new rule to satisfy %s" % msg + print "------------------------------------------------------" + decision, msg, error = \ + is_authorized_by_names_with_abac(requestor_name=user.username, + who_name=user.username, + what_name=view_2_name, view=True, + how_name=activity_2_name, activity=True, + namespace_name=namespace, + attributes=attributes) + if error < 0: + raise CommandError('is_authorized_by_names_with_abac returned %s' % str(error)) + if decision: + print "++++++++++++++++++++++++++++++++++++++++++++++++++++++" + print "Access granted by permission %s" % msg + print "++++++++++++++++++++++++++++++++++++++++++++++++++++++" + else: + print "------------------------------------------------------" + print "Access denied, new rule to satisfy %s" % msg + print "------------------------------------------------------" - ''' - Check presence, check source, check value - List all predicate and make a dictionnary with the id as key - and True or False as value - ''' - l = check_predicates(rule, profile) - print "Lets test %s with %s" % (rule.expression, l) - res = evaluation(rule.expression, l) - print "Authorized? %s" % res - if not res: - missing_p = return_sorted_variables_to_truth(rule.expression, l) - print "Missing: %s" % missing_p - new_rule = make_new_rule_from_missing_predicates(missing_p) - print "New rule: %s" % new_rule - # missing_p = arrange_missing_predicates(missing_p, profile) - # print "predicates arranged %s" % missing_p - rule.expression=new_rule - - #Add an assertion to the profile - s = get_source_form_name('IdP1') - d = get_def_from_name_and_ns('http://schemas.xmlsoap.org/ws/2005/05/identity/claims/dateofbirth', 'http://schemas.xmlsoap.org/ws/2005/05/identity/claims') - add_assertion_to_profile(profile, s, d, ('19',)) - - #check once again - l = check_predicates(rule, profile) - print "Lets test %s with %s" % (rule.expression, l) - res = evaluation(rule.expression, l) - print "Authorized? %s" % res - if not res: - missing_p = return_sorted_variables_to_truth(rule.expression, l) - print "Missing: %s" % missing_p - new_rule = make_new_rule_from_missing_predicates(missing_p) - print "New rule: %s" % new_rule - - raise Exception('END') + raise Exception('Happy end') except Exception, err: print "Exception: %s" %str(err)