196 lines
6.1 KiB
Python
196 lines
6.1 KiB
Python
import urllib
|
|
import logging
|
|
import os
|
|
import json
|
|
import datetime
|
|
import uuid
|
|
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from requests.packages.urllib3.util.retry import Retry
|
|
|
|
from django.core.urlresolvers import reverse
|
|
from django.conf import settings
|
|
from django.shortcuts import resolve_url
|
|
|
|
from . import app_settings
|
|
|
|
|
|
def build_logout_url(request, next_url=None):
|
|
"""
|
|
For now fc_id_token in request.session is used as the flag of an
|
|
active session on the OP. It is set in the login view and deleted in the
|
|
logout return view.
|
|
"""
|
|
if not next_url:
|
|
next_url = resolve_url(settings.LOGIN_REDIRECT_URL)
|
|
state = unicode(uuid.uuid4())
|
|
states = request.session.setdefault('fc_states', {})
|
|
request.session.modified = True
|
|
states[state] = {
|
|
'next': next_url,
|
|
}
|
|
if 'fc_id_token' in request.session:
|
|
callback = request.build_absolute_uri(reverse('fc-logout'))
|
|
qs = {
|
|
'id_token_hint': request.session.get('fc_id_token_raw'),
|
|
'post_logout_redirect_uri': callback,
|
|
'state': state,
|
|
}
|
|
return app_settings.logout_url + '?' + urllib.urlencode(qs)
|
|
return None
|
|
|
|
|
|
def get_mapped_attributes(request):
|
|
values = {}
|
|
if 'fc_user_info' in request.session:
|
|
for fc_name, local_name in app_settings.attributes_mapping.items():
|
|
if fc_name in request.session['fc_user_info']:
|
|
values[local_name] = [request.session['fc_user_info'][fc_name]]
|
|
return values
|
|
|
|
|
|
def get_mapped_attributes_flat(request):
|
|
values = {}
|
|
if 'fc_user_info' in request.session:
|
|
for fc_name, local_name in app_settings.attributes_mapping.items():
|
|
if fc_name in request.session['fc_user_info']:
|
|
values[local_name] = request.session['fc_user_info'][fc_name]
|
|
return values
|
|
|
|
|
|
def get_ref(ref, user_info):
|
|
if not hasattr(user_info, 'iteritems'):
|
|
return None
|
|
if '.' in ref:
|
|
left, right = ref.split('.', 1)
|
|
return get_ref(right, user_info.get(left, {}))
|
|
return user_info[ref]
|
|
|
|
|
|
def mapping_to_value(mapping, user_info):
|
|
if 'ref' in mapping:
|
|
value = get_ref(mapping['ref'], user_info)
|
|
elif 'value' in mapping:
|
|
value = mapping['value']
|
|
elif 'compute' in mapping:
|
|
if mapping['compute'] == 'today':
|
|
value = datetime.date.today()
|
|
elif mapping['compute'] == 'random':
|
|
value = unicode(uuid.uuid4())
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
if 'translation' in mapping:
|
|
if mapping['translation'] == 'insee-communes':
|
|
value = resolve_insee_commune(value)
|
|
elif mapping['translation'] == 'insee-countries':
|
|
value = resolve_insee_country(value)
|
|
elif mapping['translation'] == 'isodate':
|
|
value = datetime.datetime.strptime(value, '%Y-%m-%d').date()
|
|
elif mapping['translation'] == 'simple':
|
|
value = mapping['translation_simple'].get(
|
|
value, mapping.get('translation_simple_default', ''))
|
|
elif mapping['translation'] == 'notempty':
|
|
value = bool(value)
|
|
else:
|
|
raise NotImplementedError
|
|
return value
|
|
|
|
|
|
_insee_communes = None
|
|
|
|
|
|
def resolve_insee_commune(insee_code):
|
|
global _insee_communes
|
|
if not _insee_communes:
|
|
_insee_communes = json.load(
|
|
open(
|
|
os.path.join(
|
|
os.path.dirname(__file__), 'insee-communes.json')))
|
|
return _insee_communes.get(insee_code, 'Code insee inconnu')
|
|
|
|
|
|
_insee_countries = None
|
|
|
|
|
|
def resolve_insee_country(insee_code):
|
|
global _insee_countries
|
|
|
|
if not _insee_countries:
|
|
_insee_countries = json.load(
|
|
open(
|
|
os.path.join(
|
|
os.path.dirname(__file__), 'insee-countries.json')))
|
|
return _insee_countries.get(insee_code, 'Code insee inconnu')
|
|
|
|
|
|
def apply_user_info_mappings(user, user_info):
|
|
assert user
|
|
assert user_info
|
|
|
|
logger = logging.getLogger(__name__)
|
|
mappings = app_settings.user_info_mappings
|
|
|
|
save_user = False
|
|
tags = set()
|
|
for attribute, mapping in mappings.iteritems():
|
|
# normalize mapping to dictionaries
|
|
if isinstance(mapping, basestring):
|
|
mapping = {'ref': mapping}
|
|
try:
|
|
value = mapping_to_value(mapping, user_info)
|
|
except (ValueError, KeyError, NotImplementedError) as e:
|
|
logger.warning(u'auth_fc: cannot apply mapping %s <- %r: %s', attribute, mapping, e)
|
|
continue
|
|
if mapping.get('if-tag') and mapping['if-tag'] not in tags:
|
|
continue
|
|
|
|
if attribute == 'password':
|
|
if mapping.get('if-empty') and user.has_usable_password():
|
|
continue
|
|
save_user = True
|
|
user.set_password(value)
|
|
elif hasattr(user, attribute):
|
|
save_user = True
|
|
if mapping.get('if-empty') and getattr(user, attribute):
|
|
continue
|
|
setattr(user, attribute, value)
|
|
elif hasattr(user.attributes, attribute):
|
|
if mapping.get('if-empty') and getattr(user.attributes, attribute):
|
|
continue
|
|
if mapping.get('verified', False):
|
|
setattr(user.verified_attributes, attribute, value)
|
|
else:
|
|
setattr(user.attributes, attribute, value)
|
|
else:
|
|
logger.warning(u'auth_fc: unknown attribut in user_info mapping: %s', attribute)
|
|
continue
|
|
if mapping.get('tag'):
|
|
tags.add(mapping['tag'])
|
|
if save_user:
|
|
user.save()
|
|
|
|
|
|
def requests_retry_session(
|
|
retries=3,
|
|
backoff_factor=0.5,
|
|
status_forcelist=(500, 502, 504),
|
|
session=None,
|
|
):
|
|
'''Create a requests session which retries after 0.5s then 1s'''
|
|
session = session or requests.Session()
|
|
retry = Retry(
|
|
total=retries,
|
|
read=retries,
|
|
connect=retries,
|
|
backoff_factor=backoff_factor,
|
|
status_forcelist=status_forcelist,
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry)
|
|
session.mount('http://', adapter)
|
|
session.mount('https://', adapter)
|
|
# set proxies
|
|
session.proxies.update(getattr(settings, 'REQUESTS_PROXIES', {}))
|
|
return session
|