This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
veridic/acs/core.py

608 lines
24 KiB
Python

import sys
from django.conf import settings
from models import *
def isAuthorizedRBAC0(who, what, how):
select_who = None
select_what = None
select_how = None
if str(who.__class__).find('User') > -1:
select_who = 'User'
elif str(who.__class__).find('Role') > -1:
select_who = 'Role'
else:
raise TypeError()
if str(what.__class__).find('AcsObject') > -1:
select_what = 'AcsObject'
elif str(what.__class__).find('View') > -1:
select_what = 'View'
elif str(what.__class__).find('User') > -1:
select_what = 'User'
elif str(what.__class__).find('Role') > -1:
select_what = 'Role'
elif str(what.__class__).find('Action') > -1:
select_what = 'Action'
elif str(what.__class__).find('Activity') > -1:
select_what = 'Activity'
else:
raise TypeError()
if str(how.__class__).find('Action') > -1:
select_how = 'Action'
elif str(how.__class__).find('Activity') > -1:
select_how = 'Activity'
else:
raise TypeError()
p = None
# Working list
r = []
# List of roles already checked
control = []
if select_who == 'User':
# 1. Look for a permission on who is User, what is AcsObject and how is action
if select_what == 'AcsObject':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='AcsObject', what_acs_object=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='AcsObject', what_acs_object=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'View':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='View', what_view=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='View', what_view=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'User':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='User', what_user=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='User', what_user=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'Role':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='Role', what_role=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='Role', what_role=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'Action':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='Action', what_action=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='Action', what_action=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'Activity':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='Activity', what_activity=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='User', who_user=who,
select_what='Activity', what_activity=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
else:
raise TypeError()
if p:
return p
# Retreive all the direct users' roles
roles = Role.objects.filter(users__username=who.username)
# Init the working list
for role in roles:
r.append(role)
elif select_who == 'Role':
r.append(who)
else:
raise TypeError()
if not settings.ROLE_GRAPH_LIMIT:
limit = 0
print >> sys.stderr, 'No limit'
else:
limit = settings.ROLE_GRAPH_LIMIT
# 2. Look for a permissions on the user's roles
# 3. Go down in the directed graph of roles
i=0
while r and (limit==0 or i<limit):
print >> sys.stderr, 'loop : ' + str(i)
for role in r:
print >> sys.stderr, 'recherche sur : ' + role.name
if select_what == 'AcsObject':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='AcsObject', what_acs_object=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='AcsObject', what_acs_object=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'View':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='View', what_view=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='View', what_view=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'User':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='User', what_user=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='User', what_user=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'Role':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='Role', what_role=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='Role', what_role=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'Action':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='Action', what_action=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='Action', what_action=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
elif select_what == 'Activity':
if select_how == 'Action':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='Activity', what_activity=what,
select_how='Action', how_action=how,
)
elif select_how == 'Activity':
p = AcsPermission.objects.filter(
select_who='Role', who_role=role,
select_what='Activity', what_activity=what,
select_how='Activity', how_activity=how,
)
else:
raise TypeError()
else:
raise TypeError()
if p:
return p
tmp = []
for role in r:
# Append the list of roles already checked
control.append(role)
for it in role.roles.all():
if it not in control:
'''Avoid loop'''
tmp.append(it)
r = tmp
i=i+1
if i==limit and limit!=0:
print >> sys.stderr, 'Limit reached'
return None
def isAuthorizedRBAC1(who, what, how):
p = None
p = isAuthorizedRBAC0(who, what, how)
if p:
return p
select_how = None
if str(how.__class__).find('Action') > -1:
select_how = 'Action'
elif str(how.__class__).find('Activity') > -1:
select_how = 'Activity'
else:
raise TypeError()
a = []
if select_how == 'Action':
# Retreive all the direct users' roles
activities = Activity.objects.filter(actions__name=how.name)
# Init the working list
for activity in activities:
a.append(activity)
elif select_how == 'Activity':
a.append(how)
else:
raise TypeError()
if not settings.ACTIVITY_GRAPH_LIMIT:
limit = 0
print >> sys.stderr, 'No limit'
else:
limit = settings.ACTIVITY_GRAPH_LIMIT
control = []
i=0
while a and (limit==0 or i<limit):
print >> sys.stderr, 'loop : ' + str(i)
for activity in a:
print >> sys.stderr, 'recherche sur : ' + activity.name
p = isAuthorizedRBAC0(who, what, activity)
if p:
return p
tmp = []
for activity in a:
# Append the list of roles already checked
control.append(activity)
for it in Activity.objects.filter(activities__name=activity.name):
if it not in control:
'''Avoid loop'''
tmp.append(it)
a = tmp
i=i+1
if i==limit and limit!=0:
print >> sys.stderr, 'Limit reached'
return None
def isAuthorizedRBAC2(who, what, how):
p = None
p = isAuthorizedRBAC1(who, what, how)
if p:
return p
select_what = None
if str(what.__class__).find('AcsObject') > -1:
select_what = 'AcsObject'
elif str(what.__class__).find('View') > -1:
select_what = 'View'
elif str(what.__class__).find('User') > -1:
select_what = 'User'
elif str(what.__class__).find('Role') > -1:
select_what = 'Role'
elif str(what.__class__).find('Action') > -1:
select_what = 'Action'
elif str(what.__class__).find('Activity') > -1:
select_what = 'Activity'
else:
raise TypeError()
# Working list
v = []
if select_what == 'AcsObject':
views = View.objects.filter(acs_objects__name=what.name)
for view in views:
v.append(view)
elif select_what == 'User':
views = View.objects.filter(users__username=what.username)
for view in views:
v.append(view)
elif select_what == 'Role':
views = View.objects.filter(roles__name=what.name)
for view in views:
v.append(view)
elif select_what == 'Action':
views = View.objects.filter(actions__name=what.name)
for view in views:
v.append(view)
elif select_what == 'Activity':
views = View.objects.filter(activities__name=what.name)
for view in views:
v.append(view)
elif select_what == 'View':
v.append(what)
else:
raise TypeError()
if not settings.VIEW_GRAPH_LIMIT:
limit = 0
print >> sys.stderr, 'No limit'
else:
limit = settings.VIEW_GRAPH_LIMIT
control = []
i=0
while v and (limit==0 or i<limit):
print >> sys.stderr, 'loop : ' + str(i)
for view in v:
print >> sys.stderr, 'recherche sur : ' + view.name
p = isAuthorizedRBAC1(who, view, how)
if p:
return p
tmp = []
for view in v:
# Append the list of roles already checked
control.append(view)
for it in View.objects.filter(views__name=view.name):
if it not in control:
'''Avoid loop'''
tmp.append(it)
v = tmp
i=i+1
if i==limit and limit!=0:
print >> sys.stderr, 'Limit reached'
return None
def draw_graph(path, display='neato'):
import networkx as nx
import matplotlib
import matplotlib.cbook
import matplotlib.pyplot as plt
G=nx.DiGraph()
gg = []
nodes_users = []
nodes_roles = []
nodes_acs_objects = []
nodes_views = []
nodes_activities = []
nodes_actions = []
nodes_permissions = []
edges_belonging = []
edges_inheritance = []
roles = Role.objects.all()
for role in roles:
if not role.name in nodes_roles:
nodes_roles.append(role.name)
for it in role.roles.all():
if not it.name in nodes_roles:
nodes_roles.append(it.name)
edges_inheritance.append((role.name, it.name,0))
gg.append((role.name, it.name,0)) #gr.add_edge((role.name, it.name))
print >> sys.stderr, 'edge : ' + role.name + ':' + it.name
users = role.users.all()
for it in users:
if not it.username in nodes_users:
nodes_users.append(it.username)
edges_belonging.append((role.name, it.username,0))
gg.append((role.name, it.username, 0)) #gr.add_edge((role.name, it.username))
print >> sys.stderr, 'edge : ' + role.name + ':' + it.username
views = View.objects.all()
for view in views:
if not view.name in nodes_views:
nodes_views.append(view.name)
nodes_views.append(view.name)
for it in view.views.all():
if not it.name in nodes_views:
nodes_views.append(it.name)
edges_inheritance.append((view.name, it.name,0))
gg.append((view.name, it.name,0)) #gr.add_edge((view.name, it.name))
print >> sys.stderr, 'edge : ' + view.name + ':' + it.name
acs_objects = view.acs_objects.all()
for it in acs_objects:
if not it.name in nodes_acs_objects:
nodes_acs_objects.append(it.name)
edges_belonging.append((view.name, it.name,0))
gg.append((view.name, it.name,0)) #gr.add_edge((view.name, it.name))
print >> sys.stderr, 'edge : ' + view.name + ':' + it.name
users = view.users.all()
for it in users:
if not it.username in nodes_users:
nodes_users.append(it.username)
edges_belonging.append((view.name, it.username,0))
gg.append((view.name, it.username,0)) #gr.add_edge((view.name, it.username))
print >> sys.stderr, 'edge : ' + view.name + ':' + it.username
roles = view.roles.all()
for it in roles:
if not it.name in nodes_roles:
nodes_roles.append(it.name)
edges_belonging.append((view.name, it.name,0))
gg.append((view.name, it.name,0)) #gr.add_edge((view.name, it.name))
print >> sys.stderr, 'edge : ' + view.name + ':' + it.name
actions = view.actions.all()
for it in actions:
if not it.name in nodes_actions:
nodes_actions.append(it.name)
edges_belonging.append((view.name, it.name,0))
gg.append((view.name, it.name,0)) #gr.add_edge((view.name, it.name))
print >> sys.stderr, 'edge : ' + view.name + ':' + it.name
activities = view.activities.all()
for it in activities:
if not it.name in nodes_activities:
nodes_activities.append(it.name)
edges_belonging.append((view.name, it.name,0))
gg.append((view.name, it.name,0)) #gr.add_edge((view.name, it.name))
print >> sys.stderr, 'edge : ' + view.name + ':' + it.name
activities = Activity.objects.all()
for activity in activities:
if not activity.name in nodes_activities:
nodes_activities.append(activity.name)
for it in activity.activities.all():
if not it.name in nodes_activities:
nodes_activities.append(it.name)
edges_inheritance.append((activity.name, it.name,0))
gg.append((activity.name, it.name,0)) #gr.add_edge((activity.name, it.name))
print >> sys.stderr, 'edge : ' + activity.name + ':' + it.name
actions = activity.actions.all()
for it in actions:
if not it.name in nodes_actions:
nodes_actions.append(it.name)
nodes_actions.append(it.name)
edges_belonging.append((activity.name, it.name,0))
gg.append((activity.name, it.name,0)) #gr.add_edge((activity.name, it.name))
print >> sys.stderr, 'edge : ' + activity.name + ':' + it.name
edges_perm_who = []
edges_perm_what = []
edges_perm_how = []
permissions = AcsPermission.objects.all()
i=0
for permission in permissions:
nodes_permissions.append('P'+str(i))
if permission.select_who == 'User':
if not permission.who_user.username in nodes_users:
nodes_users.append(permission.who_user.username)
edges_perm_who.append(('P'+str(i), permission.who_user.username, 0))
gg.append(('P'+str(i), permission.who_user.username, 0)) #gr.add_edge(('P'+str(i), permission.who_user.username))
else:
if not permission.who_role.name in nodes_roles:
nodes_roles.append(permission.who_role.name)
edges_perm_who.append(('P'+str(i), permission.who_role.name, 0))
gg.append(('P'+str(i), permission.who_role.name, 0)) #gr.add_edge(('P'+str(i), permission.who_role.name))
if permission.select_what == 'User':
if not permission.what_user.username in nodes_users:
nodes_users.append(permission.what_user.username)
edges_perm_what.append(('P'+str(i), permission.what_user.username, 0))
gg.append(('P'+str(i), permission.what_user.username, 0))
elif permission.select_what == 'Role':
if not permission.what_role.name in nodes_roles:
nodes_roles.append(permission.what_role.name)
edges_perm_what.append
gg.append(('P'+str(i), permission.what_role.name, 0))
elif permission.select_what == 'View':
if not permission.what_view.name in nodes_views:
nodes_views.append(permission.what_view.name)
edges_perm_what.append(('P'+str(i), permission.what_view.name, 0))
gg.append(('P'+str(i), permission.what_view.name, 0))
elif permission.select_what == 'Action':
if not permission.what_action.name in nodes_actions:
nodes_actions.append(permission.what_action.name)
edges_perm_what.append(('P'+str(i), permission.what_action.name, 0))
gg.append(('P'+str(i), permission.what_action.name, 0))
elif permission.select_what == 'Activity':
if not permission.what_activity.name in nodes_activities:
nodes_activities.append(permission.what_activity.name)
edges_perm_what.append(('P'+str(i), permission.what_activity.name, 0))
gg.append(('P'+str(i), permission.what_activity.name, 0))
else:
if not permission.what_acs_object.name in nodes_acs_objects:
nodes_acs_objects.append(permission.what_acs_object.name)
edges_perm_what.append(('P'+str(i), permission.what_acs_object.name, 0))
gg.append(('P'+str(i), permission.what_acs_object.name, 0))
if permission.select_how == 'Action':
if not permission.how_action.name in nodes_actions:
nodes_actions.append(permission.how_action.name)
edges_perm_how.append(('P'+str(i), permission.how_action.name, 0))
gg.append(('P'+str(i), permission.how_action.name, 0))
else:
if not permission.how_activity.name in nodes_activities:
nodes_activities.append(permission.how_activity.name)
edges_perm_how.append(('P'+str(i), permission.how_activity.name, 0))
gg.append(('P'+str(i), permission.how_activity.name, 0))
i=i+1
G.add_weighted_edges_from(gg)
try:
from networkx import graphviz_layout
except ImportError:
raise ImportError("This example needs Graphviz and either PyGraphviz or Pydot")
plt.figure(1,figsize=(8,8))
# layout graphs with positions using graphviz neato
#pos=nx.graphviz_layout(G,prog='twopi')
#pos=nx.graphviz_layout(G,prog='fdp')
#pos=nx.graphviz_layout(G,prog='dot')
#pos=nx.graphviz_layout(G,prog='circo')
pos=nx.graphviz_layout(G,prog=display)
nx.draw_networkx_nodes(G,pos,node_size=200,nodelist=nodes_roles,node_color='#1647b5')
nx.draw_networkx_nodes(G,pos,node_size=200,nodelist=nodes_users,node_color='#2c87d4')
nx.draw_networkx_nodes(G,pos,node_size=200,nodelist=nodes_views,node_color='#32ab46')
nx.draw_networkx_nodes(G,pos,node_size=200,nodelist=nodes_acs_objects,node_color='#2cd47b')
nx.draw_networkx_nodes(G,pos,node_size=200,nodelist=nodes_activities,node_color='#ab9032')
nx.draw_networkx_nodes(G,pos,node_size=200,nodelist=nodes_actions,node_color='#bdbc46')
nx.draw_networkx_nodes(G,pos,node_size=250,nodelist=nodes_permissions,node_color='#db533c')
#nx.draw_networkx_edges(G,pos,edgelist=edges_perm_who,alpha=0.8,width=3,edge_color='#1647b5')
#nx.draw_networkx_edges(G,pos,edgelist=edges_perm_what,alpha=0.8,width=3,edge_color='#32ab46')
#nx.draw_networkx_edges(G,pos,edgelist=edges_perm_how,alpha=0.8,width=3,edge_color='#ab9032')
# XXX: Modify to work with the hack on arrows
# https://networkx.lanl.gov/trac/ticket/78
nx.draw_networkx_edges(G,pos,edgelist=edges_perm_who,alpha=0.8,width=1.5,edge_color='b')
nx.draw_networkx_edges(G,pos,edgelist=edges_perm_what,alpha=0.8,width=1.5,edge_color='g')
nx.draw_networkx_edges(G,pos,edgelist=edges_perm_how,alpha=0.8,width=1.5,edge_color='y')
nx.draw_networkx_edges(G,pos,edgelist=edges_inheritance,width=1)
nx.draw_networkx_edges(G,pos,edgelist=edges_belonging,alpha=0.7)
nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold')
plt.axis('off')
plt.savefig(path,dpi=150)