authentic2-auth-fedict/src/authentic2_auth_fedict/adapters.py

214 lines
8.6 KiB
Python

# authentic2_auth_fedict - Fedict authentication for Authentic
# Copyright (C) 2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import hashlib
import logging
import os
import time
import requests
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.files.storage import default_storage
from django.utils.encoding import force_bytes, force_text
import lasso
from mellon.adapters import DefaultAdapter, app_settings
import mellon.utils as mellon_utils
from authentic2.models import Attribute
from authentic2.a2_rbac.utils import get_default_ou
try:
import authentic2.utils.misc as a2_utils_misc
except ImportError:
import authentic2.utils as a2_utils_misc
def check_nrn(nrn):
remainder = (97 - int(nrn[:9])) % 97
if remainder == 0:
remainder = 97
return int(remainder) == int(nrn[-2:])
def check_nrn_y2k(nrn):
remainder = (97 - int('2' + nrn[:9])) % 97
if remainder == 0:
remainder = 97
return int(remainder) == int(nrn[-2:])
class AuthenticAdapter(DefaultAdapter):
def auth_login(self, request, user):
a2_utils_misc.login(request, user, 'fedict')
def get_identity_providers_setting(self):
providers = app_settings.IDENTITY_PROVIDERS
cache_path = default_storage.path('fedict-cache')
if not os.path.exists(cache_path):
os.makedirs(cache_path)
for idp in providers:
if 'METADATA_URL' in idp and 'METADATA' not in idp:
url_hash = hashlib.sha1(force_bytes(idp['METADATA_URL'])).hexdigest()
metadata_cache_filename = os.path.join(cache_path, url_hash)
if os.path.exists(metadata_cache_filename):
stat_info = os.stat(metadata_cache_filename)
if stat_info.st_size and stat_info.st_mtime > (time.time() - 86400):
idp['METADATA'] = force_text(open(metadata_cache_filename).read())
continue
verify_ssl_certificate = mellon_utils.get_setting(idp, 'VERIFY_SSL_CERTIFICATE')
try:
response = requests.get(idp['METADATA_URL'], verify=verify_ssl_certificate)
response.raise_for_status()
except requests.exceptions.RequestException as e:
if os.path.exists(metadata_cache_filename):
# accept older cache in case of error
idp['METADATA'] = force_text(open(metadata_cache_filename).read())
continue
idp['METADATA'] = response.text
with open(metadata_cache_filename, 'wb') as fd:
fd.write(response.content)
return providers
def lookup_user(self, idp, saml_attributes):
if 'email' in saml_attributes:
# XXX: remove email from received attributes for now, this
# facilitates emulating Fedict IdP with another authentic.
del saml_attributes['email']
# replace SAML name id content by fedict specific "fedid"
saml_attributes['name_id_content_orig'] = saml_attributes['name_id_content']
saml_attributes['name_id_content'] = saml_attributes['urn:be:fedict:iam:attr:fedid'][0]
saml_attributes['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED
user = super(AuthenticAdapter, self).lookup_user(idp, saml_attributes)
if not user.ou_id:
user.ou = get_default_ou()
user.save()
return user
def create_user(self, user_class):
return user_class.objects.create()
def provision_from_nrn(self, user, nrn):
if not getattr(settings, 'RRN_POP_SERVICE_URL', None):
return
logger = logging.getLogger(__name__)
try:
response = requests.get(settings.RRN_POP_SERVICE_URL + nrn, verify=False, timeout=5)
except requests.exceptions.RequestException as e:
logger.error('error connecting to rrn pop service (%r)', e)
return
if response.status_code != 200:
logger.error('wrong status code from rrn pop service (%s)', response.status_code)
return
attributes = response.json()
if attributes.get('rue'):
# fix street name
attributes['rue'] = attributes['rue'].split('(')[0]
if 'dateNaissance' in attributes:
attributes['dateNaissance'] = attributes['dateNaissance'].replace('.', '/')
if 'lieuxMariage' in attributes:
# change from list to string
attributes['lieuxMariage'] = '|'.join(attributes.get('lieuxMariage'))
if not attributes['lieuxMariage']:
# special case "no wedding"
attributes['lieuxMariage'] = '-'
# assume all answers are for Belgium, this may not actually be true
attributes['pays'] = 'Belgique'
# set "direct" first/last names attributes, or they'll get overwritten
# with the value from the SAML assertion and marked as non-verified.
# (because the NRN only return the first first name, not all of them).
user.first_name = attributes.get('prenom') or ''
user.last_name = attributes.get('nom') or ''
user.save()
attribute_mapping = [
('prenom', 'first_name'),
('nom', 'last_name'),
('commune', 'city'),
('rue', 'street'),
('rue', 'address'), # alternative attribute name
('numero', 'num_house'),
('codePostal', 'zipcode'),
('boite', 'num_box'),
('lieuxMariage', 'wedding_cities'),
('lieuNaissance', 'birthplace'),
('dateNaissance', 'birthdate'),
('pays', 'country'),
]
for nrn_attribute, user_attribute in attribute_mapping:
try:
Attribute.objects.get(name=user_attribute).set_value(
user, attributes.get(nrn_attribute) or '', verified=True
)
except Attribute.DoesNotExist:
pass
def provision_attribute(self, user, idp, saml_attributes):
super(AuthenticAdapter, self).provision_attribute(user, idp, saml_attributes)
if not user.email:
# make sure the account is not usable for now
user.is_active = False
user.save()
nrn = None
if saml_attributes.get('egovNRN'):
nrn = saml_attributes['egovNRN'][0]
for attr_name in ('niss', 'nrn'):
try:
Attribute.objects.get(name=attr_name).set_value(user, nrn, verified=True)
except Attribute.DoesNotExist:
pass
if nrn[:6] == '0000001': # unknown date
birthdate = ''
else:
if check_nrn(nrn):
birthdate = '%s/%s/19%s' % (nrn[4:6], nrn[2:4], nrn[:2])
elif check_nrn_y2k(nrn):
birthdate = '%s/%s/20%s' % (nrn[4:6], nrn[2:4], nrn[:2])
else:
birthdate = ''
try:
Attribute.objects.get(name='birthdate').set_value(user, birthdate, verified=True)
except AttributeError: # native authentic date field
birthdate = datetime.datetime.strptime(birthdate, '%d/%m/%Y').date()
Attribute.objects.get(name='birthdate').set_value(user, birthdate, verified=True)
if int(nrn[6:9]) % 2:
title = 'Monsieur'
else:
title = 'Madame'
Attribute.objects.get(name='title').set_value(user, title, verified=True)
if saml_attributes.get('givenName'):
Attribute.objects.get(name='first_name').set_value(
user, saml_attributes['givenName'][0], verified=True
)
if saml_attributes.get('surname'):
Attribute.objects.get(name='last_name').set_value(
user, saml_attributes['surname'][0], verified=True
)
user.save()
if nrn:
self.provision_from_nrn(user, nrn)