202 lines
8.1 KiB
Python
202 lines
8.1 KiB
Python
# authentic2_auth_fedict - Fedict authentication for Authentic
|
|
# Copyright (C) 2016 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
|
|
import requests
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.files.storage import default_storage
|
|
|
|
import lasso
|
|
|
|
from mellon.adapters import DefaultAdapter, app_settings
|
|
import mellon.utils as mellon_utils
|
|
from authentic2.models import Attribute
|
|
from authentic2 import utils
|
|
from authentic2.a2_rbac.utils import get_default_ou
|
|
|
|
|
|
def check_nrn(nrn):
|
|
remainder = (97 - int(nrn[:9])) % 97
|
|
if remainder == 0:
|
|
remainder = 97
|
|
return int(remainder) == int(nrn[-2:])
|
|
|
|
def check_nrn_y2k(nrn):
|
|
remainder = (97 - int('2' + nrn[:9])) % 97
|
|
if remainder == 0:
|
|
remainder = 97
|
|
return int(remainder) == int(nrn[-2:])
|
|
|
|
|
|
class AuthenticAdapter(DefaultAdapter):
|
|
def auth_login(self, request, user):
|
|
utils.login(request, user, 'fedict')
|
|
|
|
def get_identity_providers_setting(self):
|
|
providers = app_settings.IDENTITY_PROVIDERS
|
|
cache_path = default_storage.path('fedict-cache')
|
|
if not os.path.exists(cache_path):
|
|
os.makedirs(cache_path)
|
|
for idp in providers:
|
|
if 'METADATA_URL' in idp and 'METADATA' not in idp:
|
|
url_hash = hashlib.sha1(idp['METADATA_URL']).hexdigest()
|
|
metadata_cache_filename = os.path.join(cache_path, url_hash)
|
|
if os.path.exists(metadata_cache_filename):
|
|
stat_info = os.stat(metadata_cache_filename)
|
|
if stat_info.st_size and stat_info.st_mtime > (time.time() - 86400):
|
|
idp['METADATA'] = open(metadata_cache_filename).read().decode('utf-8')
|
|
continue
|
|
verify_ssl_certificate = mellon_utils.get_setting(idp, 'VERIFY_SSL_CERTIFICATE')
|
|
try:
|
|
response = requests.get(idp['METADATA_URL'], verify=verify_ssl_certificate)
|
|
response.raise_for_status()
|
|
except requests.exceptions.RequestException as e:
|
|
if os.path.exists(metadata_cache_filename):
|
|
# accept older cache in case of error
|
|
idp['METADATA'] = open(metadata_cache_filename).read().decode('utf-8')
|
|
continue
|
|
idp['METADATA'] = response.text
|
|
with open(metadata_cache_filename, 'w') as fd:
|
|
fd.write(response.text.encode('utf-8'))
|
|
return providers
|
|
|
|
def lookup_user(self, idp, saml_attributes):
|
|
if 'email' in saml_attributes:
|
|
# XXX: remove email from received attributes for now, this
|
|
# facilitates emulating Fedict IdP with another authentic.
|
|
del saml_attributes['email']
|
|
# replace SAML name id content by fedict specific "fedid"
|
|
saml_attributes['name_id_content_orig'] = saml_attributes['name_id_content']
|
|
saml_attributes['name_id_content'] = saml_attributes['urn:be:fedict:iam:attr:fedid'][0]
|
|
saml_attributes['name_id_format'] = lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED
|
|
user = super(AuthenticAdapter, self).lookup_user(idp, saml_attributes)
|
|
if not user.ou_id:
|
|
user.ou = get_default_ou()
|
|
user.save()
|
|
return user
|
|
|
|
def create_user(self, user_class):
|
|
return user_class.objects.create()
|
|
|
|
def provision_from_nrn(self, user, nrn):
|
|
if not getattr(settings, 'RRN_POP_SERVICE_URL', None):
|
|
return
|
|
logger = logging.getLogger(__name__)
|
|
try:
|
|
response = requests.get(settings.RRN_POP_SERVICE_URL + nrn,
|
|
verify=False, timeout=5)
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error('error connecting to rrn pop service (%r)', e)
|
|
return
|
|
if response.status_code != 200:
|
|
logger.error('wrong status code from rrn pop service (%s)',
|
|
response.status_code)
|
|
return
|
|
attributes = response.json()
|
|
if attributes.get('rue'):
|
|
# fix street name
|
|
attributes['rue'] = attributes['rue'].split('(')[0]
|
|
|
|
if 'dateNaissance' in attributes:
|
|
attributes['dateNaissance'] = attributes['dateNaissance'].replace('.', '/')
|
|
|
|
if 'lieuxMariage' in attributes:
|
|
# change from list to string
|
|
attributes['lieuxMariage'] = '|'.join(attributes.get('lieuxMariage'))
|
|
if not attributes['lieuxMariage']:
|
|
# special case "no wedding"
|
|
attributes['lieuxMariage'] = '-'
|
|
|
|
# assume all answers are for Belgium, this may not actually be true
|
|
attributes['pays'] = 'Belgique'
|
|
|
|
# set "direct" first/last names attributes, or they'll get overwritten
|
|
# with the value from the SAML assertion and marked as non-verified.
|
|
# (because the NRN only return the first first name, not all of them).
|
|
user.first_name = attributes.get('prenom') or ''
|
|
user.last_name = attributes.get('nom') or ''
|
|
user.save()
|
|
|
|
attribute_mapping = [
|
|
('prenom', 'first_name'),
|
|
('nom', 'last_name'),
|
|
('commune', 'city'),
|
|
('rue', 'street'),
|
|
('rue', 'address'), # alternative attribute name
|
|
('numero', 'num_house'),
|
|
('codePostal', 'zipcode'),
|
|
('boite', 'num_box'),
|
|
('lieuxMariage', 'wedding_cities'),
|
|
('lieuNaissance', 'birthplace'),
|
|
('dateNaissance', 'birthdate'),
|
|
('pays', 'country'),
|
|
]
|
|
for nrn_attribute, user_attribute in attribute_mapping:
|
|
try:
|
|
Attribute.objects.get(name=user_attribute).set_value(user,
|
|
attributes.get(nrn_attribute) or '',
|
|
verified=True)
|
|
except Attribute.DoesNotExist:
|
|
pass
|
|
|
|
def provision_attribute(self, user, idp, saml_attributes):
|
|
super(AuthenticAdapter, self).provision_attribute(user, idp, saml_attributes)
|
|
if not user.email:
|
|
# make sure the account is not usable for now
|
|
user.is_active = False
|
|
user.save()
|
|
|
|
nrn = saml_attributes['egovNRN'][0]
|
|
for attr_name in ('niss', 'nrn'):
|
|
try:
|
|
Attribute.objects.get(name=attr_name).set_value(user, nrn,
|
|
verified=True)
|
|
except Attribute.DoesNotExist:
|
|
pass
|
|
if nrn[:6] == '0000001': # unknown date
|
|
birthdate = ''
|
|
else:
|
|
if check_nrn(nrn):
|
|
birthdate = '%s/%s/19%s' % (nrn[4:6], nrn[2:4], nrn[:2])
|
|
elif check_nrn_y2k(nrn):
|
|
birthdate = '%s/%s/20%s' % (nrn[4:6], nrn[2:4], nrn[:2])
|
|
else:
|
|
birthdate = ''
|
|
try:
|
|
Attribute.objects.get(name='birthdate').set_value(user, birthdate,
|
|
verified=True)
|
|
except AttributeError: # native authentic date field
|
|
birthdate = datetime.datetime.strptime(birthdate, '%d/%m/%Y').date()
|
|
Attribute.objects.get(name='birthdate').set_value(user, birthdate,
|
|
verified=True)
|
|
if int(nrn[6:9]) % 2:
|
|
title = 'Monsieur'
|
|
else:
|
|
title = 'Madame'
|
|
Attribute.objects.get(name='title').set_value(user, title,
|
|
verified=True)
|
|
|
|
user.save()
|
|
|
|
if nrn:
|
|
self.provision_from_nrn(user, nrn)
|