Sync attribute_aggregator with Authentic 2.

This commit is contained in:
Mikaël Ates 2011-12-26 10:42:21 +01:00
parent e5a7a5367b
commit 7752b83be1
8 changed files with 2745 additions and 194 deletions

View File

@ -26,8 +26,8 @@ import re
from django.db import transaction
from django.core.exceptions import ObjectDoesNotExist
from attribute_aggregator.xacml_constants import *
from attribute_aggregator.mapping import ATTRIBUTE_MAPPING
from acs.attribute_aggregator.xacml_constants import *
from acs.attribute_aggregator.mapping import ATTRIBUTE_MAPPING
logger = logging.getLogger('attribute_aggregator')
@ -50,7 +50,7 @@ def get_all_attribute_definitions():
def get_all_sources():
from attribute_aggregator.models import AttributeSource
from acs.attribute_aggregator.models import AttributeSource
return AttributeSource.objects.all()
@ -60,14 +60,79 @@ def get_full_definition(definition):
return ATTRIBUTE_MAPPING[definition]
def get_def_name_from_oid(oid):
if not oid:
return None
for def_name, content in ATTRIBUTE_MAPPING.items():
if 'oid' in content:
if content['oid'] == oid:
return def_name
return None
def get_oid_from_def_name(definition_name):
if not definition_name or not definition_name in ATTRIBUTE_MAPPING \
or not 'oid' in ATTRIBUTE_MAPPING[definition_name]:
return None
return ATTRIBUTE_MAPPING[definition_name]['oid']
def get_def_name_from_alias(alias):
if not alias:
return None
for def_name, content in ATTRIBUTE_MAPPING.items():
if 'alias' in content:
if alias in content['alias']:
return def_name
return None
def get_definition_from_oid(oid):
if not oid:
return None
for def_name, content in ATTRIBUTE_MAPPING.items():
if 'oid' in content:
if content['oid'] == oid:
return ATTRIBUTE_MAPPING[def_name]
return None
def get_definition_from_alias(alias):
if not alias:
return None
for def_name, content in ATTRIBUTE_MAPPING.items():
if 'alias' in content:
if alias in content['alias']:
return ATTRIBUTE_MAPPING[def_name]
return None
def get_profile_field_name_from_definition(definition):
if definition and definition in ATTRIBUTE_MAPPING \
and 'profile_field_name' in ATTRIBUTE_MAPPING[definition]:
return ATTRIBUTE_MAPPING[definition]['profile_field_name']
return None
def get_definition_from_profile_field_name(field_name):
if not field_name:
return None
for def_name, content in ATTRIBUTE_MAPPING.items():
if 'profile_field_name' in content:
if field_name == content['profile_field_name']:
return def_name
return None
def get_def_name_from_name_and_ns_of_attribute(name, namespace):
if not name or not namespace:
return None
for def_name, content in ATTRIBUTE_MAPPING.items():
if namespace in content["definitions"].keys():
if name in content["definitions"][namespace]["identifiers"]:
if "namespaces" in content \
and namespace in content["namespaces"].keys():
if name in content["namespaces"][namespace]["identifiers"]:
return def_name
if name in content["definitions"][namespace]["friendly_name"]:
if name in content["namespaces"][namespace]["friendly_names"]:
return def_name
return None
@ -75,10 +140,26 @@ def get_def_name_from_name_and_ns_of_attribute(name, namespace):
def get_attribute_name_in_namespace(definition, namespace):
if not definition or not namespace:
return None
logger.debug('get_attribute_name_in_namespace: look for %s in %s' \
% (definition, namespace))
if definition in ATTRIBUTE_MAPPING:
if namespace in ATTRIBUTE_MAPPING[definition]["definitions"]:
logger.debug('get_attribute_name_in_namespace: definition found')
if "namespaces" in ATTRIBUTE_MAPPING[definition]\
and namespace in ATTRIBUTE_MAPPING[definition]["namespaces"]:
logger.debug('get_attribute_name_in_namespace: namespace found')
return ATTRIBUTE_MAPPING[definition]\
["definitions"][namespace]["identifiers"][0]
["namespaces"][namespace]["identifiers"][0]
return None
def get_attribute_friendly_name_in_namespace(definition, namespace):
if not definition or not namespace:
return None
if definition in ATTRIBUTE_MAPPING:
if "namespaces" in ATTRIBUTE_MAPPING[definition]\
and namespace in ATTRIBUTE_MAPPING[definition]["namespaces"]:
return ATTRIBUTE_MAPPING[definition]\
["namespaces"][namespace]["friendly_names"][0]
return None
@ -89,6 +170,22 @@ def get_attribute_type_of_definition(definition):
return ATTRIBUTE_MAPPING[definition]["type"]
def is_alias_of_definition(definition_name, alias):
if definition_name in ATTRIBUTE_MAPPING \
and 'alias' in ATTRIBUTE_MAPPING[definition_name] \
and alias in ATTRIBUTE_MAPPING[definition_name]['alias']:
return True
return False
def is_oid_of_definition(definition_name, oid):
if definition_name in ATTRIBUTE_MAPPING \
and 'oid' in ATTRIBUTE_MAPPING[definition_name] \
and oid == ATTRIBUTE_MAPPING[definition_name]['oid']:
return True
return False
def convert_from_string(definition_name, value):
if not definition_name in ATTRIBUTE_MAPPING:
return None
@ -153,7 +250,7 @@ def load_or_create_user_profile(user=None, no_cleanup=False):
If no_cleanup: return profile if any without removing outdated
assertions
'''
from attribute_aggregator.models import UserAttributeProfile
from acs.attribute_aggregator.models import UserAttributeProfile
profile = None
try:
if user:
@ -194,7 +291,7 @@ def load_or_create_user_profile(user=None, no_cleanup=False):
def get_user_alias_in_source(user, source):
from attribute_aggregator.models import UserAliasInSource
from acs.attribute_aggregator.models import UserAliasInSource
try:
alias = UserAliasInSource.objects.get(user=user, source=source)
return alias.name
@ -203,7 +300,7 @@ def get_user_alias_in_source(user, source):
def set_user_alias_in_source(user, source, name, force_change=False):
from attribute_aggregator.models import UserAliasInSource
from acs.attribute_aggregator.models import UserAliasInSource
logger.debug('set_user_alias_in_source: set alias %s for user %s in \
source %s' % (name, user, source))
alias = None

View File

@ -21,22 +21,28 @@
import logging
import ldap
from attribute_aggregator.core import get_user_alias_in_source, \
get_attribute_name_in_namespace
from acs.attribute_aggregator.core import get_user_alias_in_source
logger = logging.getLogger('acs')
logger = logging.getLogger('attribute_aggregator.ldap_sources')
def get_all_attributes(user, definitions=None, **kwargs):
def get_attributes(user, definitions=None, source=None, **kwargs):
'''
Return attributes dictionnary
Dictionnary format:
attributes = dict()
data_from_source = list()
a1 = dict()
a1['definition'] = definition_name
a1['name'] = attribute_name_in_ns
a1['namespace'] = ns_name
a1['oid'] = definition_name
Or
a1['definition'] = definition_name
definition may be the definition name like 'gn'
or an alias like 'givenName'
Or
a1['name'] = attribute_name_in_ns
a1['namespace'] = ns_name
a1['values'] = list_of_values
data_from_source.append(a1)
...
@ -47,28 +53,82 @@ def get_all_attributes(user, definitions=None, **kwargs):
Else, definition is searched by 'name' and 'namespece' keys.
'''
if not user:
logger.error('get_all_attributes: No user provided')
logger.error('get_attributes: No user provided')
return None
logger.debug('get_all_attributes: Searching attributes for user %s' % user)
logger.debug('get_attributes: Searching attributes for user %s' \
% user)
from attribute_aggregator.models import LdapSource
sources = LdapSource.objects.all()
from acs.attribute_aggregator.models import LdapSource
sources = None
if source:
logger.debug('get_attributes: The required source is %s' % source)
try:
sources = [source.ldapsource]
logger.debug('get_attributes: The source is an LDAP source!')
except:
logger.debug('get_attributes: \
The required source is not a LDAP one')
return None
else:
sources = LdapSource.objects.all()
if not sources:
logger.debug('get_all_attributes: No LDAP source configured')
logger.debug('get_attributes: No LDAP source configured')
return None
attributes = {}
attributes = dict()
for source in sources:
logger.debug('get_all_attributes: The LDAP source is known as %s' \
logger.debug('get_attributes: The LDAP source is known as %s' \
% source.name)
identifier = get_user_alias_in_source(user, source)
identifier = None
'''
Check if the user is authenticated by LDAP.
If it is, grab the user dn from the LDAPUser object
'''
try:
from django_auth_ldap.backend import LDAPBackend
backend = LDAPBackend()
u = backend.get_user(user.id)
dn = u.ldap_user.dn
if not dn:
logger.debug('get_attributes: \
User not logged with LDAP')
else:
logger.debug('get_attributes: \
User logged with dn %s' % dn)
'''is it logged in that source?'''
logger.debug('get_attributes: \
Is the user logged with the source %s?' % source.name)
try:
l = ldap.open(source.server)
l.protocol_version = ldap.VERSION3
username = source.user
password = source.password
if username and password:
l.simple_bind(username, password)
ldap_result_id = \
l.search(dn, ldap.SCOPE_BASE,
attrlist=['objectClass'])
result_type, result_data = l.result(ldap_result_id, 0)
logger.debug('get_attributes: Yes it is, result %s %s' \
% (result_type, result_data))
identifier = dn
except ldap.LDAPError, err:
logger.debug('get_attributes: \
User dn %s unknown in %s or error %s' \
% (dn, source.name, str(err)))
except Exception, err:
logger.error('get_attributes: \
Error working with the LDAP backend %s' %str(err))
if not identifier:
logger.error('get_all_attributes: No user identifier known into that \
source')
identifier = get_user_alias_in_source(user, source)
if not identifier:
logger.error('get_attributes: \
No user identifier known into that source')
else:
logger.debug('get_all_attributes: the user is known as %s in source %s' \
logger.debug('get_attributes: \
the user is known as %s in source %s' \
% (identifier, source.name))
try:
@ -79,52 +139,56 @@ def get_all_attributes(user, definitions=None, **kwargs):
if username and password:
l.simple_bind(username, password)
except ldap.LDAPError, err:
logger.error('get_all_attributes: an error occured at binding due \
to %s' % err)
logger.error('get_attributes: \
an error occured at binding due to %s' % err)
else:
base_dn = source.base
search_scope = ldap.SCOPE_SUBTREE
'''
No seach of user with the scope, only exact dn
'''
# base_dn = source.base
# search_scope = ldap.SCOPE_SUBTREE
search_scope = ldap.SCOPE_BASE
retrieve_attributes = None
if definitions:
retrieve_attributes = [\
get_attribute_name_in_namespace(definition,
'X500') for definition in definitions]
dn = ldap.dn.explode_dn(identifier,
flags=ldap.DN_FORMAT_LDAPV3)
search_filter = dn[0]
logger.debug('get_all_attributes: rdn is %s' % search_filter)
#The definition name is the ldap attribute name
logger.debug('get_attributes: attributes requested \
are %s' % definitions)
retrieve_attributes = \
[d.encode('utf-8') for d in definitions]
# dn = ldap.dn.explode_dn(identifier,
# flags=ldap.DN_FORMAT_LDAPV3)
# search_filter = dn[0]
# logger.debug('get_attributes: rdn is %s' % search_filter)
data = []
try:
ldap_result_id = l.search(base_dn, search_scope,
search_filter, retrieve_attributes)
# ldap_result_id = l.search(base_dn, search_scope,
# search_filter, retrieve_attributes)
ldap_result_id = l.search(identifier, search_scope,
attrlist=retrieve_attributes)
result_type, result_data = l.result(ldap_result_id, 0)
logger.debug('get_all_attributes: result %s %s' % (result_type,
result_data))
logger.debug('get_attributes: result %s %s' \
% (result_type, result_data))
for d, dic in result_data:
logger.debug('get_all_attributes: found %s' % d)
logger.debug('get_attributes: found %s' % d)
if d == identifier:
logger.debug('get_all_attributes: Attributes are %s' \
% dic)
logger.debug('get_attributes: \
Attributes are %s' % dic)
for key in dic.keys():
attr = {}
attr['name'] = key
attr['definition'] = key
attr['values'] = [\
a.decode('utf-8') for a in dic[key]]
attr['namespace'] = 'X500'
data.append(attr)
except ldap.LDAPError, err:
logger.error('get_all_attributes: an error occured at searching \
due to %s' % err)
logger.error('get_attributes: \
an error occured at searching due to %s' % err)
else:
if not data:
logger.error('get_all_attributes: no attribute found')
logger.error('get_attributes: no attribute found')
else:
attributes[source.name] = data
logger.debug('get_all_attributes: the attributes returned are %s' % attributes)
logger.debug('get_attributes: the attributes returned are %s' \
% attributes)
return attributes
def get_listed_attributes(user, definitions, **kwargs):
return get_all_attributes(user, definitions=definitions, **kwargs)

File diff suppressed because it is too large Load Diff

View File

@ -27,19 +27,31 @@ from django.utils.translation import ugettext as _
from django.db import models
from django.contrib.auth.models import User
from attribute_aggregator.signals import any_attributes_call, \
listed_attributes_call
from attribute_aggregator.mapping import ATTRIBUTE_MAPPING
from attribute_aggregator.core import convert_from_string, \
get_def_name_from_name_and_ns_of_attribute, iso8601_to_datetime
from acs.attribute_aggregator.signals import any_attributes_call, \
listed_attributes_call, listed_attributes_with_source_call
from acs.attribute_aggregator.mapping import ATTRIBUTE_MAPPING, \
ATTRIBUTE_NAMESPACES
from acs.attribute_aggregator.core import convert_from_string, \
get_def_name_from_name_and_ns_of_attribute, iso8601_to_datetime, \
get_def_name_from_oid, get_def_name_from_alias, \
is_alias_of_definition, is_oid_of_definition
logger = logging.getLogger('attribute_aggregator')
ATTRIBUTES_NS = [('Default', 'Default')] \
+ [(ns, ns) for ns in ATTRIBUTE_NAMESPACES]
class AttributeSource(models.Model):
name = models.CharField(max_length = 200, unique=True)
namespace = models.CharField(max_length = 200, blank=True, null=True)
name = models.CharField(
verbose_name = _("Name"),
max_length = 200, unique=True)
namespace = models.CharField(
verbose_name = _("Namespace"),
max_length = 100,
choices = ATTRIBUTES_NS, default = ATTRIBUTES_NS[0])
def __unicode__(self):
return self.name
@ -67,14 +79,30 @@ def get_all_sources():
class LdapSource(AttributeSource):
server = models.CharField(max_length=200, unique=True)
user = models.CharField(max_length=200, blank=True, null=True)
password = models.CharField(max_length=200, blank=True, null=True)
base = models.CharField(max_length=200)
port = models.IntegerField(default=389)
ldaps = models.BooleanField(default=False)
certificate = models.TextField(blank=True)
is_auth_backend = models.BooleanField(default=False)
server = models.CharField(
verbose_name = _("Server"),
max_length=200, unique=True)
user = models.CharField(
verbose_name = _("User"),
max_length=200, blank=True, null=True)
password = models.CharField(
verbose_name = _("Password"),
max_length=200, blank=True, null=True)
base = models.CharField(
verbose_name = _("Base"),
max_length=200)
port = models.IntegerField(
verbose_name = _("Port"),
default=389)
ldaps = models.BooleanField(
verbose_name = _("LDAPS"),
default=False)
certificate = models.TextField(
verbose_name = _("Certificate"),
blank=True)
is_auth_backend = models.BooleanField(
verbose_name = _("Is it used for authentication?"),
default=False)
def __init__(self, *args, **kwargs):
super(LdapSource, self).__init__(*args, **kwargs)
@ -82,10 +110,14 @@ class LdapSource(AttributeSource):
class UserAliasInSource(models.Model):
name = models.CharField(max_length = 200)
name = models.CharField(
verbose_name = _("Name"),
max_length = 200)
source = models.ForeignKey(AttributeSource,
verbose_name = _('Attribute Source'))
user = models.ForeignKey(User, related_name='user_alias_in_source')
user = models.ForeignKey(User,
verbose_name = _("User"),
related_name='user_alias_in_source')
class Meta:
verbose_name = _('alias in source')
@ -101,11 +133,24 @@ class AttributeData:
def __init__(self, definition, values=None, source=None,
expiration_date=None):
self.definition = definition
'''
definition can be given by its name, an alias or an oid
'''
self.definition = None
if definition in ATTRIBUTE_MAPPING:
self.definition = definition
else:
d = get_def_name_from_oid(definition)
if d:
self.definition = d
else:
self.definition = get_def_name_from_alias(definition)
if not self.definition:
raise Exception('Definition not found.')
self.values = list()
if values:
for value in values:
if convert_from_string(definition, value):
if convert_from_string(self.definition, value):
self.values.append(value.encode('utf-8'))
if isinstance(source, AttributeSource):
self.source_id = source.id
@ -155,7 +200,8 @@ class AttributeData:
return list()
def get_converted_values(self):
return [convert_from_string(self.definition, value) for value in self.values]
return [convert_from_string(self.definition, value) \
for value in self.values]
def get_source(self):
try:
@ -196,8 +242,7 @@ class AttributeData:
s = "AttributeData"
values = self.get_values()
if values:
s += " %s with values %s" % (self.get_definition(),
[v for v in values])
s += " %s with values %s" % (self.get_definition(), values)
source = self.get_source()
if source:
s += " from %s" % str(source)
@ -244,6 +289,9 @@ class UserAttributeProfile(models.Model):
return []
def get_data_of_definition(self, definition, in_list=None):
'''
definition can be given by its name, an alias or an oid
'''
l = None
if in_list:
l = in_list
@ -251,7 +299,9 @@ class UserAttributeProfile(models.Model):
l = self.get_all_data()
if not l:
return []
return [d for d in l if d.get_definition() == definition]
return [d for d in l if d.get_definition() == definition \
or is_alias_of_definition(d.get_definition(), definition) \
or is_oid_of_definition(d.get_definition(), definition)]
def get_freshest_data_of_definition(self, definition):
l = self.get_data_of_definition(definition)
@ -297,9 +347,14 @@ class UserAttributeProfile(models.Model):
attributes = dict()
data_from_source = list()
a1 = dict()
a1['definition'] = definition_name
a1['name'] = attribute_name_in_ns
a1['namespace'] = ns_name
a1['oid'] = definition_name
Or
a1['definition'] = definition_name
definition may be the definition name like 'gn'
or an alias like 'givenName'
Or
a1['name'] = attribute_name_in_ns
a1['namespace'] = ns_name
a1['values'] = list_of_values
data_from_source.append(a1)
...
@ -321,9 +376,8 @@ class UserAttributeProfile(models.Model):
logger.debug('load_by_dic: attributes: %s' \
% str(dictionnary[source_name]))
for attribute in dictionnary[source_name]:
if (not ('definition' in attribute \
and attribute['definition'] \
in ATTRIBUTE_MAPPING) \
if (not 'oid' in attribute \
and not 'definition' in attribute \
and not('name' in attribute \
and 'namespace' in attribute)) \
or not 'values' in attribute:
@ -331,10 +385,15 @@ class UserAttributeProfile(models.Model):
missing data to treat %s' % str(attribute))
else:
definition = None
if 'definition' in attribute \
and attribute['definition'] \
in ATTRIBUTE_MAPPING:
definition = attribute['definition']
if 'oid' in attribute:
definition = \
get_def_name_from_oid(attribute['oid'])
elif 'definition' in attribute:
if attribute['definition'] in ATTRIBUTE_MAPPING:
definition = attribute['definition']
else:
definition = \
get_def_name_from_alias(attribute['definition'])
else:
definition = \
get_def_name_from_name_and_ns_of_attribute(\
@ -399,16 +458,70 @@ class UserAttributeProfile(models.Model):
self.load_by_dic(attrs[1])
def load_listed_attributes(self, definitions):
'''
definitions can be given by its name, an alias or an oid
'''
if self.user:
attributes_provided = listed_attributes_call.send(sender=None,
user=self.user, definitions=definitions)
for attrs in attributes_provided:
defs = []
for d in definitions:
if d in ATTRIBUTE_MAPPING:
defs.append(d)
else:
df = get_def_name_from_oid(d)
if df:
defs.append(df)
else:
df = get_def_name_from_alias(d)
if df:
defs.append(df)
if defs:
logger.info('load_listed_attributes: \
attributes_call connected to function %s' % \
attrs[0].__name__)
attributes required are %s' % defs)
attributes_provided = listed_attributes_call.send(sender=None,
user=self.user, definitions=defs)
for attrs in attributes_provided:
logger.info('load_listed_attributes: \
attributes_call connected to function %s' % \
attrs[0].__name__)
logger.info('load_listed_attributes: \
attributes provided are %s' %str(attrs[1]))
self.load_by_dic(attrs[1])
else:
logger.info('load_listed_attributes: no definitions \
of attributes to load with %s' % str(definitions))
def load_listed_attributes_with_source(self, definitions, source):
if not source:
return
if self.user:
defs = []
for d in definitions:
if d in ATTRIBUTE_MAPPING:
defs.append(d)
else:
df = get_def_name_from_oid(d)
if df:
defs.append(df)
else:
df = get_def_name_from_alias(d)
if df:
defs.append(df)
if defs:
logger.info('load_listed_attributes: \
attributes provided are %s' %str(attrs[1]))
self.load_by_dic(attrs[1])
attributes required are %s from %s' % (defs, source))
attributes_provided = \
listed_attributes_with_source_call.send(sender=None,
user=self.user, definitions=defs, source=source)
for attrs in attributes_provided:
logger.info('load_listed_attributes: \
attributes_call connected to function %s' % \
attrs[0].__name__)
logger.info('load_listed_attributes: \
attributes provided are %s' %str(attrs[1]))
self.load_by_dic(attrs[1])
else:
logger.info('load_listed_attributes: no definitions \
of attributes to load with %s' % str(definitions))
def cleanup(self):
l = self.get_all_data()

View File

@ -18,13 +18,21 @@
'''
import ldap_sources
import user_profile
from django.dispatch import Signal
from attribute_aggregator.ldap_sources import get_all_attributes, \
get_listed_attributes
any_attributes_call = Signal(providing_args = ["user"])
listed_attributes_call = Signal(providing_args = ["user", "definitions"])
listed_attributes_with_source_call = Signal(providing_args = \
["user", "definitions", "source"])
any_attributes_call.connect(get_all_attributes)
listed_attributes_call.connect(get_listed_attributes)
any_attributes_call.connect(ldap_sources.get_attributes)
listed_attributes_call.connect(ldap_sources.get_attributes)
listed_attributes_with_source_call.connect(ldap_sources.get_attributes)
any_attributes_call.connect(user_profile.get_attributes)
listed_attributes_call.connect(user_profile.get_attributes)
listed_attributes_with_source_call.connect(user_profile.get_attributes)

View File

@ -0,0 +1,16 @@
"""
This file demonstrates writing tests using the unittest module. These will pass
when you run "manage.py test".
Replace this with more appropriate tests for your application.
"""
from django.test import TestCase
class SimpleTest(TestCase):
def test_basic_addition(self):
"""
Tests that 1 + 1 always equals 2.
"""
self.assertEqual(1 + 1, 2)

View File

@ -0,0 +1,117 @@
'''
VERIDIC - Towards a centralized access control system
Copyright (C) 2011 Mikael Ates
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
import logging
from django.contrib.auth.models import SiteProfileNotAvailable
from django.core.exceptions import ObjectDoesNotExist
from acs.attribute_aggregator.core import get_profile_field_name_from_definition, \
get_definition_from_profile_field_name
logger = logging.getLogger('attribute_aggregator.user_profile')
SOURCE_NAME = 'USER_PROFILE'
def get_attributes(user, definitions=None, source=None, **kwargs):
'''
Return attributes dictionnary
Dictionnary format:
attributes = dict()
data_from_source = list()
a1 = dict()
a1['oid'] = definition_name
Or
a1['definition'] = definition_name
definition may be the definition name like 'gn'
or an alias like 'givenName'
Or
a1['name'] = attribute_name_in_ns
a1['namespace'] = ns_name
a1['values'] = list_of_values
data_from_source.append(a1)
...
data_from_source.append(a2)
attributes[source_name] = data_from_source
First attempt on 'definition' key.
Else, definition is searched by 'name' and 'namespece' keys.
'''
from models import AttributeSource
try:
AttributeSource.objects.get(name=SOURCE_NAME)
except:
logger.debug('get_attributes: \
Profile source not configured')
return None
if source and source.name != SOURCE_NAME:
logger.debug('get_attributes: \
The required source %s is not user profile' % source)
return None
attributes = dict()
data = []
try:
user_profile = user.get_profile()
fields = []
if definitions:
for definition in definitions:
logger.debug('get_attributes: looking for %s' % definition)
field_name = get_profile_field_name_from_definition(definition)
if not field_name:
'''
Profile model may be extended without modifying the
mapping file if the attribute name is the same as the
definition
'''
logger.debug('get_attributes: \
field name will be the definition')
field_name = definition
if field_name in user_profile._meta.get_all_field_names():
fields.append((field_name, definition))
else:
logger.debug('get_attributes: Field not found in profile')
else:
fields = [(field_name,
get_definition_from_profile_field_name(field_name)) \
for field_name \
in user_profile._meta.get_all_field_names() \
if get_definition_from_profile_field_name(field_name)]
for field_name, definition in fields:
field = user_profile._meta.get_field_by_name(field_name)[0]
logger.debug('get_attributes: found field %s aka %s' \
% (field_name, field.verbose_name))
value = getattr(user_profile, field_name)
if value:
logger.debug('get_attributes: found value %s' % value)
attr = {}
attr['definition'] = definition
attr['values'] = [value]
data.append(attr)
else:
logger.debug('get_attributes: no value found')
except (SiteProfileNotAvailable, ObjectDoesNotExist):
logger.debug('get_attributes: No user profile')
return None
attributes[SOURCE_NAME] = data
return attributes

View File

@ -0,0 +1,27 @@
'''
VERIDIC Project - Towards a centralized access control system
Copyright (C) 2011 Mikael Ates
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
def urn_to_oid(urn):
_, _, oid = urn.partition('urn:oid:')
return oid
def oid_to_urn(oid):
return 'urn:oid:%s' % oid