This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-auth-fc/src/authentic2_auth_fc/utils.py

212 lines
6.9 KiB
Python

# authentic2-auth-fc - authentic2 authentication for FranceConnect
# Copyright (C) 2019 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib
import logging
import os
import json
import datetime
import uuid
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from django.core.urlresolvers import reverse
from django.conf import settings
from django.shortcuts import resolve_url
from . import app_settings
def build_logout_url(request, next_url=None):
"""
For now fc_id_token in request.session is used as the flag of an
active session on the OP. It is set in the login view and deleted in the
logout return view.
"""
if not next_url:
next_url = resolve_url(settings.LOGIN_REDIRECT_URL)
state = unicode(uuid.uuid4())
states = request.session.setdefault('fc_states', {})
request.session.modified = True
states[state] = {
'next': next_url,
}
if 'fc_id_token' in request.session:
callback = request.build_absolute_uri(reverse('fc-logout'))
qs = {
'id_token_hint': request.session.get('fc_id_token_raw'),
'post_logout_redirect_uri': callback,
'state': state,
}
return app_settings.logout_url + '?' + urllib.urlencode(qs)
return None
def get_mapped_attributes(request):
values = {}
if 'fc_user_info' in request.session:
for fc_name, local_name in app_settings.attributes_mapping.items():
if fc_name in request.session['fc_user_info']:
values[local_name] = [request.session['fc_user_info'][fc_name]]
return values
def get_mapped_attributes_flat(request):
values = {}
if 'fc_user_info' in request.session:
for fc_name, local_name in app_settings.attributes_mapping.items():
if fc_name in request.session['fc_user_info']:
values[local_name] = request.session['fc_user_info'][fc_name]
return values
def get_ref(ref, user_info):
if not hasattr(user_info, 'iteritems'):
return None
if '.' in ref:
left, right = ref.split('.', 1)
return get_ref(right, user_info.get(left, {}))
return user_info[ref]
def mapping_to_value(mapping, user_info):
if 'ref' in mapping:
value = get_ref(mapping['ref'], user_info)
elif 'value' in mapping:
value = mapping['value']
elif 'compute' in mapping:
if mapping['compute'] == 'today':
value = datetime.date.today()
elif mapping['compute'] == 'random':
value = unicode(uuid.uuid4())
else:
raise NotImplementedError
if 'translation' in mapping:
if mapping['translation'] == 'insee-communes':
value = resolve_insee_commune(value)
elif mapping['translation'] == 'insee-countries':
value = resolve_insee_country(value)
elif mapping['translation'] == 'isodate':
value = datetime.datetime.strptime(value, '%Y-%m-%d').date()
elif mapping['translation'] == 'simple':
value = mapping['translation_simple'].get(
value, mapping.get('translation_simple_default', ''))
elif mapping['translation'] == 'notempty':
value = bool(value)
else:
raise NotImplementedError
return value
_insee_communes = None
def resolve_insee_commune(insee_code):
global _insee_communes
if not _insee_communes:
_insee_communes = json.load(
open(
os.path.join(
os.path.dirname(__file__), 'insee-communes.json')))
return _insee_communes.get(insee_code, 'Code insee inconnu')
_insee_countries = None
def resolve_insee_country(insee_code):
global _insee_countries
if not _insee_countries:
_insee_countries = json.load(
open(
os.path.join(
os.path.dirname(__file__), 'insee-countries.json')))
return _insee_countries.get(insee_code, 'Code insee inconnu')
def apply_user_info_mappings(user, user_info):
assert user
assert user_info
logger = logging.getLogger(__name__)
mappings = app_settings.user_info_mappings
save_user = False
tags = set()
for attribute, mapping in mappings.iteritems():
# normalize mapping to dictionaries
if isinstance(mapping, basestring):
mapping = {'ref': mapping}
try:
value = mapping_to_value(mapping, user_info)
except (ValueError, KeyError, NotImplementedError) as e:
logger.warning(u'auth_fc: cannot apply mapping %s <- %r: %s', attribute, mapping, e)
continue
if mapping.get('if-tag') and mapping['if-tag'] not in tags:
continue
if attribute == 'password':
if mapping.get('if-empty') and user.has_usable_password():
continue
save_user = True
user.set_password(value)
elif hasattr(user.attributes, attribute):
if mapping.get('if-empty') and getattr(user.attributes, attribute):
continue
if mapping.get('verified', False):
setattr(user.verified_attributes, attribute, value)
else:
setattr(user.attributes, attribute, value)
elif hasattr(user, attribute):
save_user = True
if mapping.get('if-empty') and getattr(user, attribute):
continue
setattr(user, attribute, value)
else:
logger.warning(u'auth_fc: unknown attribute in user_info mapping: %s', attribute)
continue
if mapping.get('tag'):
tags.add(mapping['tag'])
if save_user:
user.save()
def requests_retry_session(
retries=3,
backoff_factor=0.5,
status_forcelist=(500, 502, 504),
session=None,
):
'''Create a requests session which retries after 0.5s then 1s'''
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
# set proxies
session.proxies.update(getattr(settings, 'REQUESTS_PROXIES', {}))
return session