This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
veridic/acs/signals.py

109 lines
3.9 KiB
Python

import logging
import ldap
from django.dispatch import Signal
from models import UserAlias
from abac.models import LdapSource, AttributeNamespace
logger = logging.getLogger('acs')
attributes_call = Signal(providing_args = ["request", "user"])
def ldap_sources(request, user, **kwargs):
if not user:
logger.error('ldap_source: No user provided')
return {}
logger.debug('ldap_source: Searching attributes for user %s' % user)
sources = LdapSource.objects.all()
if not sources:
logger.debug('ldap_source: No LDAP source configured')
return {}
att_ns = AttributeNamespace.objects.get(friendly_name='LDAP')
attribute_namespace = att_ns.identifier
attributes = {}
for source in sources:
logger.debug('ldap_source: The LDAP source is known as %s' \
% source.name)
'''
Grab the DN
'''
identifier = None
if isinstance(user, UserAlias):
'''
If the request is on a user alias with no django user,
attributes can be retrieved if the alias is an ldap dn
'''
identifier = user.alias
else:
from abac.core import get_identifier_in_source
identifier = get_identifier_in_source(user, source)
if not identifier:
logger.error('ldap_source: No user identifier known into that \
source')
else:
logger.debug('ldap_source: the user is known as %s in source %s' \
% (identifier, source.name))
try:
l = ldap.open(source.server)
l.protocol_version = ldap.VERSION3
username = source.user
password = source.password
l.simple_bind(username, password)
except ldap.LDAPError, err:
logger.error('ldap_source: an error occured at binding due \
to %s' % err)
else:
base_dn = source.base
search_scope = ldap.SCOPE_SUBTREE
retrieve_attributes = None
dn = ldap.dn.explode_dn(identifier,
flags=ldap.DN_FORMAT_LDAPV3)
search_filter = dn[0]
logger.debug('ldap_source: rdn is %s' % search_filter)
data = []
try:
ldap_result_id = l.search(base_dn, search_scope,
search_filter, retrieve_attributes)
result_type, result_data = l.result(ldap_result_id, 0)
logger.debug('ldap_source: result %s %s' % (result_type,
result_data))
for d, dic in result_data:
logger.debug('ldap_source: found %s' % d)
if d == identifier:
logger.debug('ldap_source: Attributes are %s' \
% dic)
for key in dic.keys():
attr = {}
attr['name'] = key
attr['values'] = [\
a.decode('utf-8'). \
encode('ascii', 'ignore') \
for a in dic[key]]
attr['namespace'] = attribute_namespace
data.append(attr)
except ldap.LDAPError, err:
logger.error('ldap_source: an error occured at searching \
due to %s' % err)
else:
if not data:
logger.error('ldap_source: no attribute found')
else:
attributes[source.name] = data
logger.debug('ldap_source: the attributes returned are %s' % attributes)
return attributes
attributes_call.connect(ldap_sources)