A unique script is now used for initialization including abac

This commit is contained in:
Mikaël Ates 2011-08-23 16:55:20 +02:00
parent 3ca2307646
commit a7cfe5423c
2 changed files with 296 additions and 284 deletions

View File

@ -18,29 +18,71 @@
'''
import getpass
import json
from pprint import pprint
from optparse import make_option
from django.db import transaction
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from acs import settings
from acs.models import View, AcsPermission, Namespace, Action, \
UserAlias, Role
from acs.abac.models import AttributeNamespace, AttributeDefinition, \
AttributeMapInNamespace, AttributeName
from acs.abac.core import get_namespace_by_identifier, \
get_namespace_by_friendly_name
class Command(BaseCommand):
'''
Initialize Acs with a root user with all rights.
Rerun with: python manage.py sqlclear acs | python manage.py dbshell \
&& python manage.py syncdb \
&& python manage.py initialize-acs <username> [--existing]
Initialization script:
- Initialize Acs.
- Create a root user with all rights.
- Inject the attribute definitions in the database.
Names hardly encoded:
Namespace:Default,
Action:administration,
Role:root_admin_role, root_user_administrator_role,
root_abac_administrator_role
View: root_admin_view
Run with:
python manage.py initialize-acs <username> [--existing] \
[--without-abac] [--reload-abac]
--existing
The user already exists in the database.
Else, the user is created.
--reload-abac
Skip the initial configuration and update the
attribute definitions.
*** Be sure to use --reload-abac option if it is not the first
time the script is run and it is not expected to reinitialize
ACS ***
--without-abac
Only the initial configuration.
Reinitialization:
python manage.py sqlclear acs | python manage.py dbshell \
&& python manage.py syncdb \
&& python manage.py initialize-acs <username> [--existing] \
[--without-abac] [--reload-abac]
If you want to clear all the attribute definitions use
python manage.py sqlclear abac
WARNING: The reinitialization, or re-running the initialization
script, assume that you want to reset ACS and you will loose all
policies.
Objects with their names hardly encoded in the script and created in
the database:
Namespace: 'Default',
Action: 'administration',
Roles: 'root_admin_role', 'root_user_administrator_role',
'root_abac_administrator_role'
View: 'root_admin_view'
'''
can_import_django_settings = True
@ -52,6 +94,18 @@ class Command(BaseCommand):
dest='existing',
default=False,
help='Indicate if it is an existing user'),
) + (
make_option('--without-abac',
action='store_true',
dest='without-abac',
default=False,
help='Indicate if it is an existing user'),
) + (
make_option('--reload-abac',
action='store_true',
dest='reload-abac',
default=False,
help='Indicate if it is an existing user'),
)
args = '<username>'
help = \
@ -59,146 +113,253 @@ class Command(BaseCommand):
@transaction.commit_manually
def handle(self, *args, **options):
try:
if not args:
raise CommandError('No username on the command line')
username = args[0]
user = None
if options['existing']:
print 'Look for the existing user %s' %username
if not options['reload-abac']:
try:
if not args:
raise CommandError('No username on the command line')
username = args[0]
user = None
if options['existing']:
print 'Look for the existing user %s' %username
try:
user = User.objects.get(username=username)
print 'User found: %s' %user
except:
raise CommandError('Unable to get existing user %s' \
%username)
else:
print 'Creation of user %s' %username
try:
user = User.objects.get(username=username)
except:
pass
if user:
raise CommandError('Already existing user %s' %user)
MAX_TRIES = 3
count = 0
p1, p2 = 1, 2
while p1 != p2 and count < MAX_TRIES:
p1 = getpass.getpass(prompt="Password: ")
if not p1:
raise CommandError("aborted")
p2 = getpass.getpass(prompt="Password (again): ")
if not p2:
raise CommandError("aborted")
if p1 != p2:
print "Passwords do not match. Please try again."
count = count + 1
if count == MAX_TRIES:
raise CommandError("Aborting creation of user '%s' after \
%s attempts" % (username, count))
try:
user = User(username=username)
user.set_password(p1)
user.save()
print 'User created: %s' %user
except:
raise CommandError('Unable to create user %s' %username)
ns = None
try:
user = User.objects.get(username=username)
print 'User found: %s' %user
ns = Namespace(name='Default')
ns.save()
print 'Default namespace created: %s' %ns
except:
raise CommandError('Unable to get existing user %s' \
%username)
raise CommandError('Unable to create the default namespace')
alias = None
try:
'''The alias in Default has the username as name'''
alias = UserAlias(alias=user.username,
user=user, namespace=ns)
alias.save()
print 'Default alias created: %s' %alias
except:
raise CommandError('Unable to create the default alias')
#view_name = user.username+'_admin_view'
view_name = 'root_admin_view'
view = None
try:
view = View(name=view_name, namespace=ns)
view.save()
print 'Root system view created: %s' %view
except:
raise CommandError('Unable to create root system view')
view.users.add(alias)
print \
'Default alias of the root user added to its root system view'
#role_name = user.username+'_admin_role'
role_name = 'root_admin_role'
role=None
try:
role = Role(name=role_name, namespace=ns)
role.save()
print 'Root system role created: %s' %role
except:
raise CommandError('Unable to create root system role')
view.roles.add(role)
print 'Default role added to the root system view'
a = None
try:
a = Action(name='administration')
a.save()
print 'Administration action created: %s' %a
except:
raise \
CommandError('Unable to create the administration action')
p = None
try:
p = AcsPermission(who=role, what=view, how=a)
p.save()
print 'Root administration permission created: %s' %p
except:
raise \
CommandError('Unable to create the root administration \
permission for the root administration role')
role.users.add(alias)
print 'User added to the root role'
ua = None
try:
ua = Role(name='root_user_administrator_role')
ua.save()
print 'Special role user administrator created: %s' %ua
except:
raise \
CommandError('Unable to create special \
role user administrator')
ua = None
try:
ua = Role(name='root_abac_administrator_role')
ua.save()
print 'Special role abac administrator created: %s' %ua
except:
raise \
CommandError('Unable to create special \
role abac administrator')
except:
transaction.rollback()
raise
else:
print 'Creation of user %s' %username
transaction.commit()
print '---> Successful main initialization'
if options['without-abac']:
raise CommandError('No ABAC, exit')
try:
print '---> Start Attribute Based Access Control application initialisation'
json_data = open('acs/abac/namespaces.json')
data = json.load(json_data)
print '--> Namespace file contains:'
pprint(data)
for ns in data:
print 'Looking for ns with friendly name %s and identifier %s' \
% (ns, data[ns])
ok = False
try:
user = User.objects.get(username=username)
except:
pass
if user:
raise CommandError('Already existing user %s' %user)
AttributeNamespace.objects.get(friendly_name=ns)
print '- existing namespace with friendly name %s, not updated' %ns
except ObjectDoesNotExist:
ok = True
except MultipleObjectsReturned:
print '###ERROR: multiple namespace with the friendly name %s' %ns
if ok:
ok = False
try:
AttributeNamespace.objects.get(identifier=data[ns])
print '- existing namespace with identifier %s, not updated' %data[ns]
except ObjectDoesNotExist:
ok = True
except MultipleObjectsReturned:
print '###ERROR: multiple namespace with the identifier %s' %data[ns]
if ok:
nsi = AttributeNamespace(friendly_name=ns, identifier=data[ns])
nsi.save()
print '+ namespace added'
MAX_TRIES = 3
count = 0
p1, p2 = 1, 2
while p1 != p2 and count < MAX_TRIES:
p1 = getpass.getpass(prompt="Password: ")
if not p1:
raise CommandError("aborted")
p2 = getpass.getpass(prompt="Password (again): ")
if not p2:
raise CommandError("aborted")
if p1 != p2:
print "Passwords do not match. Please try again."
count = count + 1
json_data.close()
if count == MAX_TRIES:
raise CommandError("Aborting creation of user '%s' after \
%s attempts" % (username, count))
json_data = open('acs/abac/attribute_mapping.json')
data = json.load(json_data)
print '--> Attribute mapping file contains:'
pprint(data)
try:
user = User(username=username)
user.set_password(p1)
user.save()
print 'User created: %s' %user
except:
raise CommandError('Unable to create user %s' %username)
for attribute in data:
print 'Dealing with attribute %s' %attribute
for attr_decl in data[attribute]:
print 'Working on %s' %attr_decl
ns = None
try:
ns = Namespace(name='Default')
ns.save()
print 'Default namespace created: %s' %ns
except:
raise CommandError('Unable to create the default namespace')
t = attr_decl['type']
print 'Loof for a definition of %s of type %s' \
% (attribute, t)
d, c = AttributeDefinition.objects.get_or_create(attribute_name=attribute)
alias = None
try:
'''The alias in Default has the username as name'''
alias = UserAlias(alias=user.username,
user=user, namespace=ns)
alias.save()
print 'Default alias created: %s' %alias
except:
raise CommandError('Unable to create the default alias')
if c:
print "+ definition created"
else:
print "- existing definition"
if d.attribute_type != t:
d.attribute_type = t
d.save()
#view_name = user.username+'_admin_view'
view_name = 'root_admin_view'
view = None
try:
view = View(name=view_name, namespace=ns)
view.save()
print 'Root system view created: %s' %view
except:
raise CommandError('Unable to create root system view')
definitions = attr_decl['definitions']
if not definitions:
print "Missing definitions"
else:
print "Working with definitions %s" %definitions
for ns_decl in definitions:
ns_name = ns_decl[0]
print 'Namespace declared is %s' %ns_name
ns = None
ns = get_namespace_by_friendly_name(ns_name)
if not ns:
ns = get_namespace_by_identifier(ns_name)
if not ns:
print 'Namespace with name or id %s not found' %ns_name
else:
print 'Namespace found %s' %ns
view.users.add(alias)
print \
'Default alias of the root user added to its root system view'
m, c = AttributeMapInNamespace.objects.get_or_create(namespace=ns,
attribute_definition=d)
if c:
print "+ map created"
else:
print "- existing map"
names = ns_decl[1]
for name in names:
print 'Found name %s' %name
a, c = AttributeName.objects.get_or_create(attribute_map=m, name=name)
if c:
print "+ name created"
else:
print "- existing name"
#role_name = user.username+'_admin_role'
role_name = 'root_admin_role'
role=None
try:
role = Role(name=role_name, namespace=ns)
role.save()
print 'Root system role created: %s' %role
except:
raise CommandError('Unable to create root system role')
json_data.close()
view.roles.add(role)
print 'Default role added to the root system view'
a = None
try:
a = Action(name='administration')
a.save()
print 'Administration action created: %s' %a
except:
raise \
CommandError('Unable to create the administration action')
p = None
try:
p = AcsPermission(who=role, what=view, how=a)
p.save()
print 'Root administration permission created: %s' %p
except:
raise \
CommandError('Unable to create the root administration \
permission for the root administration role')
role.users.add(alias)
print 'User added to the root role'
ua = None
try:
ua = Role(name='root_user_administrator_role')
ua.save()
print 'Special role user administrator created: %s' %ua
except:
raise \
CommandError('Unable to create special \
role user administrator')
ua = None
try:
ua = Role(name='root_abac_administrator_role')
ua.save()
print 'Special role abac administrator created: %s' %ua
except:
raise \
CommandError('Unable to create special \
role abac administrator')
except:
except Exception, err:
transaction.rollback()
raise
print '---> Error due to %s' %str(err)
print '---> All transactions cancelled'
else:
transaction.commit()
print '---> Successful initialization'
print '---> Successful ABAC initialization'

View File

@ -1,149 +0,0 @@
'''
VERIDIC - Towards a centralized access control system
Copyright (C) 2011 Mikael Ates
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
import json
from pprint import pprint
from django.db import transaction
from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
from acs import settings
from acs.abac.models import AttributeNamespace, AttributeDefinition, \
AttributeMapInNamespace, AttributeName
from acs.abac.core import get_namespace_by_identifier, \
get_namespace_by_friendly_name
class Command(BaseCommand):
'''
Initialize namespace and attribute mapping
'''
can_import_django_settings = True
output_transaction = True
requires_model_validation = True
option_list = BaseCommand.option_list
help = \
'No help.'
#Option to delete all existing namespace and definition
@transaction.commit_manually
def handle(self, *args, **options):
try:
print '---> Start Attribute Based Access Control application initialisation'
json_data = open('acs/abac/namespaces.json')
data = json.load(json_data)
print '--> Namespace file contains:'
pprint(data)
for ns in data:
print 'Looking for ns with friendly name %s and identifier %s' \
% (ns, data[ns])
ok = False
try:
AttributeNamespace.objects.get(friendly_name=ns)
print '- existing namespace with friendly name %s, not updated' %ns
except ObjectDoesNotExist:
ok = True
except MultipleObjectsReturned:
print '###ERROR: multiple namespace with the friendly name %s' %ns
if ok:
ok = False
try:
AttributeNamespace.objects.get(identifier=data[ns])
print '- existing namespace with identifier %s, not updated' %data[ns]
except ObjectDoesNotExist:
ok = True
except MultipleObjectsReturned:
print '###ERROR: multiple namespace with the identifier %s' %data[ns]
if ok:
nsi = AttributeNamespace(friendly_name=ns, identifier=data[ns])
nsi.save()
print '+ namespace added'
json_data.close()
json_data = open('acs/abac/attribute_mapping.json')
data = json.load(json_data)
print '--> Attribute mapping file contains:'
pprint(data)
for attribute in data:
print 'Dealing with attribute %s' %attribute
for attr_decl in data[attribute]:
print 'Working on %s' %attr_decl
t = attr_decl['type']
print 'Loof for a definition of %s of type %s' \
% (attribute, t)
d, c = AttributeDefinition.objects.get_or_create(attribute_name=attribute)
if c:
print "+ definition created"
else:
print "- existing definition"
if d.attribute_type != t:
d.attribute_type = t
d.save()
definitions = attr_decl['definitions']
if not definitions:
print "Missing definitions"
else:
print "Working with definitions %s" %definitions
for ns_decl in definitions:
ns_name = ns_decl[0]
print 'Namespace declared is %s' %ns_name
ns = None
ns = get_namespace_by_friendly_name(ns_name)
if not ns:
ns = get_namespace_by_identifier(ns_name)
if not ns:
print 'Namespace with name or id %s not found' %ns_name
else:
print 'Namespace found %s' %ns
m, c = AttributeMapInNamespace.objects.get_or_create(namespace=ns,
attribute_definition=d)
if c:
print "+ map created"
else:
print "- existing map"
names = ns_decl[1]
for name in names:
print 'Found name %s' %name
a, c = AttributeName.objects.get_or_create(attribute_map=m, name=name)
if c:
print "+ name created"
else:
print "- existing name"
json_data.close()
except Exception, err:
transaction.rollback()
print '---> Error due to %s' %str(err)
print '---> All transactions cancelled'
else:
transaction.commit()
print '---> Successful initialization'