227 lines
7.1 KiB
Python
227 lines
7.1 KiB
Python
import ldap
|
|
import json
|
|
import logging
|
|
import unicodedata
|
|
|
|
from urllib2 import build_opener, urlopen, HTTPHandler, Request, HTTPError
|
|
from random import randint
|
|
from django.utils.translation import ugettext as _
|
|
from django.shortcuts import render
|
|
|
|
rootdn = 'dc=condorcet,dc=dev,dc=entrouvert,dc=org'
|
|
people_base = 'ou=people,'+rootdn
|
|
structures_base = 'ou=structures,'+rootdn
|
|
groups_base = 'ou=Groups,'+rootdn
|
|
scope = ldap.SCOPE_SUBTREE
|
|
|
|
logger = logging.getLogger('sp_sso.resource')
|
|
|
|
sso_attributes = [
|
|
'prenom', 'nom', 'email', 'ep_principal_name', 'ep_primary_affiliation',
|
|
'ep_affiliation', 's_etablissement', 's_entite_affectation_principale',
|
|
's_entite_affectation', 's_emp_corps', 'hote_unite', 'hote_etablissement',
|
|
's_liste_rouge', 'hote_commentaire']
|
|
|
|
sso_select_attributes = [
|
|
'ep_primary_affiliation', 'hote_unite', 'hote_etablissement']
|
|
|
|
sso_tupled_attributes = {
|
|
's_emp_corps': 'EMP_CORPS_CHOICES'
|
|
}
|
|
|
|
sso_needed_description_attributes = [
|
|
's_etablissement', 'ep_primary_affiliation']
|
|
|
|
sso_strict_readonly_attributes = ['s_entite_affectation_principale']
|
|
|
|
supann_host_role_attribute = 'supannRoleGenerique'
|
|
supann_host_role_value = '{SUPANN}R10' # 'Responsable de mission'
|
|
|
|
|
|
def sorting_key(tuple_input):
|
|
if type(tuple_input[1]) is unicode:
|
|
return tuple_input[1].encode('utf-8')
|
|
else:
|
|
return unicodedata.normalize('NFD', unicode(tuple_input[1], 'utf-8'))
|
|
|
|
def render_message(request, message):
|
|
"""Renders a simple message to a base template"""
|
|
return render(request, 'simple_message.html', {'message': message})
|
|
|
|
def generate_eppn(lastname):
|
|
"""
|
|
Used when no eduPersonPrincipalName attribute is fetched from the
|
|
identity provider during the single sign-on process.
|
|
|
|
Returns a randomly generated EPPN value in the form of a valid Campus
|
|
Condorcet email address.
|
|
"""
|
|
return "%s-%06d@campus-condorcet.fr"%(lastname, randint(0,pow(10,6)))
|
|
|
|
def ldap_init():
|
|
# The server's hostname:
|
|
server = "condorcet.dev.entrouvert.org"
|
|
# Admin DN:
|
|
who = "cn=admin,dc=condorcet,dc=dev,dc=entrouvert,dc=org"
|
|
# Credentials: XXX
|
|
cred = "test"
|
|
|
|
try:
|
|
l = ldap.open(server)
|
|
l.simple_bind(who, cred)
|
|
except ldap.LDAPError:
|
|
logger.error('Error while binding to the OpenLDAP server.')
|
|
l = None
|
|
return l
|
|
|
|
|
|
def ldap_get_etablissements():
|
|
"""Used to fill the choices in hote_etablissemnt form ChoiceField."""
|
|
return ldap_get_attribute_from_subtree_nodes(
|
|
structures_base, '(objectClass=supannOrg)', 'supannEtablissement')
|
|
|
|
def ldap_get_code_etablissements():
|
|
return ldap_get_attribute_from_subtree_nodes(
|
|
structures_base, '(objectClass=supannOrg)', 'supannEtablissement')
|
|
|
|
def ldap_get_description_etablissements(code):
|
|
etablissements = ldap_get_attribute_from_subtree_nodes(
|
|
structures_base, '(supannEtablissement=%s)'%code, 'description')
|
|
if len(etablissements) == 2:
|
|
return etablissements[-1][0]
|
|
else:
|
|
return ''
|
|
|
|
def ldap_get_unites():
|
|
"""Used to fill the choices in hote_unite form ChoiceField."""
|
|
return ldap_get_attribute_from_subtree_nodes(
|
|
structures_base, '(!(objectClass=supannOrg))', 'supannCodeEntite')
|
|
|
|
def ldap_get_description_unites(code):
|
|
unites = ldap_get_attribute_from_subtree_nodes(
|
|
structures_base, '(supannCodeEntite=%s)'%code, 'description')
|
|
if len(unites) == 2:
|
|
return unites[-1][0]
|
|
else:
|
|
return ''
|
|
|
|
def ldap_get_affectations():
|
|
"""
|
|
Used to fill the choices in s_entite_affectation_principale form
|
|
ChoiceField.
|
|
"""
|
|
return ldap_get_attribute_from_subtree_nodes(
|
|
structures_base, '(objectClass=supannEntite)', 'supannCodeEntite')
|
|
|
|
|
|
def ldap_get_attribute_from_subtree_nodes(subtree_base, filter, attribute):
|
|
l_handle = ldap_init()
|
|
if not l_handle:
|
|
return []
|
|
|
|
res = l_handle.search(subtree_base, scope, filter)
|
|
rtype, rdata = l_handle.result(res, 1)
|
|
ldap_terminate(l_handle)
|
|
|
|
choices = []
|
|
|
|
# Extra 'null' choice
|
|
choices.append((None, '------------'))
|
|
|
|
for node in rdata:
|
|
node_data = node[1]
|
|
attribute_list = node_data.get(attribute)
|
|
if attribute_list:
|
|
attribute_value = attribute_list[0] # No need for multi-valued attributes
|
|
description = node_data.get('description', [_('No description')])[0]
|
|
choices.append((attribute_value, description))
|
|
|
|
choices.sort(key=sorting_key)
|
|
return choices
|
|
|
|
def ldap_terminate(l):
|
|
l.unbind()
|
|
|
|
def ldap_contains_user(user_data):
|
|
# The way we derive UIDs from the id dict
|
|
# is defined in ldap_craft_uid:
|
|
rdata = None
|
|
if user_data.get('ep_principal_name'):
|
|
#filter = "uid="+user_data['ep_principal_name']
|
|
filter = "eduPersonPrincipalName={}".format(user_data.get('ep_principal_name'))
|
|
logger.info('Filter : {}'.format(filter))
|
|
|
|
l = ldap_init()
|
|
|
|
# Look for a user possessing the same uid:
|
|
res = l.search(people_base, scope, filter, None)
|
|
rtype, rdata = l.result(res, 0)
|
|
|
|
ldap_terminate(l)
|
|
|
|
logger.info('Data : {}'.format(rdata))
|
|
|
|
# Does this user already is in the LDAP directory ?
|
|
return 1 if rdata else 0
|
|
|
|
|
|
def saml_collect_data(request):
|
|
form = {}
|
|
|
|
# Build the SSO operation summary:
|
|
if 'mellon_session' in request.session:
|
|
for attribute in sso_attributes:
|
|
if hasattr(request.user, attribute):
|
|
attribute_element = getattr(request.user, attribute)
|
|
form[attribute] = attribute_element
|
|
|
|
return form
|
|
|
|
def wcs_submit(user_data, posturl):
|
|
opener = build_opener(HTTPHandler)
|
|
# Generate a JSON to bind against the wcs ReST API
|
|
form = {}
|
|
form["data"] = user_data # see wcs documentation
|
|
data = json.dumps(form)
|
|
|
|
req = Request(posturl, data)
|
|
req.add_header("Content-Type", "application/json")
|
|
req.add_header("Accept", "application/json")
|
|
|
|
try:
|
|
opener.open(req)
|
|
except HTTPError, e:
|
|
logger.error('HTTP error %d during WCS form submission'%(e.code))
|
|
|
|
def initial_from_tracking_code(tracking_code):
|
|
if not tracking_code:
|
|
return {}
|
|
base_url = "http://forms-condorcet.dev.entrouvert.org/api/code/"
|
|
tracking_url = base_url+tracking_code
|
|
try:
|
|
req = urlopen(tracking_url)
|
|
response_string = req.read()
|
|
except:
|
|
logger.error('Wrong tracking code.')
|
|
return {}
|
|
|
|
try:
|
|
response_dict = json.loads(response_string)
|
|
form_url = response_dict.get('url')
|
|
form_list = form_url.split('/')
|
|
form_number = form_list.pop()
|
|
req = urlopen(
|
|
'http://forms-condorcet.dev.entrouvert.org/api/forms/invitation/'\
|
|
+form_number)
|
|
initial_response = json.loads(req.read())
|
|
# Add comments aimed at w.c.s. backoffice agents
|
|
initial_response.get('fields', {}).update(
|
|
{'hote_commentaire': 'Identite de l\'invitant : %s %s %s' % (
|
|
initial_response.get('fields', {}).get('hote_prenom'),
|
|
initial_response.get('fields', {}).get('hote_nom'),
|
|
initial_response.get('fields', {}).get('hote_courriel'))})
|
|
except:
|
|
return {}
|
|
|
|
return initial_response.get('fields', {})
|