This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
paul-synchro/django/sp_sso/saml/utils.py

206 lines
6.3 KiB
Python

import ldap
import json
import logging
from urllib2 import build_opener, urlopen, HTTPHandler, Request, HTTPError
from random import randint
from django.utils.translation import ugettext as _
from django.shortcuts import render
rootdn = 'dc=condorcet,dc=dev,dc=entrouvert,dc=org'
people_base = 'ou=people,'+rootdn
structures_base = 'ou=structures,'+rootdn
groups_base = 'ou=Groups,'+rootdn
scope = ldap.SCOPE_SUBTREE
logger = logging.getLogger('sp_sso.resource')
sso_attributes = [
'prenom', 'nom', 'email', 'ep_principal_name', 'ep_primary_affiliation',
'ep_affiliation', 's_etablissement', 's_entite_affectation_principale',
's_entite_affectation', 's_emp_corps', 'hote_unite', 'hote_etablissement',
's_liste_rouge', 'hote_commentaire']
sso_select_attributes = [
'ep_primary_affiliation', 'hote_unite', 'hote_etablissement']
sso_tupled_attributes = {
's_emp_corps': 'EMP_CORPS_CHOICES'
}
supann_host_role_attribute = 'supannRoleGenerique'
supann_host_role_value = '{SUPANN}R10' # 'Responsable de mission'
def render_message(request, message):
"""Renders a simple message to a base template"""
return render(request, 'simple_message.html', {'message': message})
def generate_eppn(lastname):
"""
Used when no eduPersonPrincipalName attribute is fetched from the
identity provider during the single sign-on process.
Returns a randomly generated EPPN value in the form of a valid Campus
Condorcet email address.
"""
return "%s-%06d@campus-condorcet.fr"%(lastname, randint(0,pow(10,6)))
def craft_user_nickname(mellon_dict):
prenom = mellon_dict.get('prenom')[0]
nom = mellon_dict.get('nom')[0]
return " "+prenom+" "+nom
def ldap_init():
# The server's hostname:
server = "condorcet.dev.entrouvert.org"
# Admin DN:
who = "cn=admin,dc=condorcet,dc=dev,dc=entrouvert,dc=org"
# Credentials: XXX
cred = "test"
try:
l = ldap.open(server)
l.simple_bind(who, cred)
except ldap.LDAPError:
logger.error('Error while binding to the OpenLDAP server.')
l = None
return l
def ldap_get_etablissements():
"""Used to fill the choices in hote_etablissemnt form ChoiceField."""
return ldap_get_attribute_from_subtree_nodes(
structures_base, '(objectClass=supannOrg)', 'ou')
def ldap_get_unites():
"""Used to fill the choices in hote_unite form ChoiceField."""
return ldap_get_attribute_from_subtree_nodes(
structures_base, '(supannTypeEntite=*)', 'ou')
def ldap_get_affectations():
"""
Used to fill the choices in s_entite_affectation_principale form
ChoiceField.
"""
return ldap_get_attribute_from_subtree_nodes(
structures_base, '(objectClass=supannEntite)', 'supannCodeEntite')
def ldap_get_attribute_from_subtree_nodes(subtree_base, filter, attribute):
l_handle = ldap_init()
if not l_handle:
return []
res = l_handle.search(subtree_base, scope, filter)
rtype, rdata = l_handle.result(res, 1)
ldap_terminate(l_handle)
choices = []
# Extra 'null' choice
choices.append((None, '------------'))
for node in rdata:
node_data = node[1]
attribute_list = node_data.get(attribute)
if attribute_list:
attribute_value = attribute_list[0] # No need for multi-valued attributes
# at the moment
description = node_data.get('description', [_('No description')])[0]
choices.append((attribute_value, description))
choices.sort(key=lambda x: unicode(x[1], 'utf-8'))
return choices
def ldap_terminate(l):
l.unbind()
def ldap_contains_user(user_data):
# The way we derive UIDs from the id dict
# is defined in ldap_craft_uid:
rdata = None
if user_data.get('ep_principal_name'):
#filter = "uid="+user_data['ep_principal_name']
filter = "eduPersonPrincipalName={}".format(user_data.get('ep_principal_name'))
logger.info('Filter : {}'.format(filter))
l = ldap_init()
# Look for a user possessing the same uid:
res = l.search(people_base, scope, filter, None)
rtype, rdata = l.result(res, 0)
ldap_terminate(l)
logger.info('Data : {}'.format(rdata))
# Does this user already is in the LDAP directory ?
return 1 if rdata else 0
def saml_collect_data(request):
form = {}
# Build the SSO operation summary:
if 'mellon_session' in request.session:
data = request.session['mellon_session']
for attribute in sso_attributes:
if data.get(attribute):
attribute_element = data.get(attribute)[0]
form[attribute] = attribute_element
return form
def wcs_submit(user_data, posturl):
opener = build_opener(HTTPHandler)
# Generate a JSON to bind against the wcs ReST API
form = {}
form["data"] = user_data # see wcs documentation
data = json.dumps(form)
req = Request(posturl, data)
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
try:
opener.open(req)
except HTTPError, e:
logger.error('HTTP error %d during WCS form submission'%(e.code))
def initial_from_tracking_code(tracking_code):
if not tracking_code:
return {}
base_url = "http://forms-condorcet.dev.entrouvert.org/api/code/"
tracking_url = base_url+tracking_code
try:
req = urlopen(tracking_url)
response_string = req.read()
except:
logger.error('Wrong tracking code.')
return {}
try:
response_dict = json.loads(response_string)
form_url = response_dict.get('url')
form_list = form_url.split('/')
form_number = form_list.pop()
req = urlopen(
'http://forms-condorcet.dev.entrouvert.org/api/forms/invitation/'\
+form_number)
initial_response = json.loads(req.read())
# Add comments aimed at w.c.s. backoffice agents
initial_response.get('fields', {}).update(
{'hote_commentaire': 'Identite de l\'invitant : %s %s %s' % (
initial_response.get('fields', {}).get('hote_prenom'),
initial_response.get('fields', {}).get('hote_nom'),
initial_response.get('fields', {}).get('hote_courriel'))})
except:
return {}
return initial_response.get('fields', {})