authentic/src/authentic2_auth_oidc/backends.py

308 lines
13 KiB
Python

# authentic2 - versatile identity manager
# Copyright (C) 2010-2020 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import datetime
import requests
from jwcrypto.jwt import JWT
from jwcrypto.jwk import JWK
from django.utils import six
from django.utils.timezone import now
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
from django_rbac.utils import get_ou_model
from authentic2.crypto import base64url_encode
from authentic2 import app_settings, hooks
from authentic2.utils.template import Template
from . import models, utils
class OIDCBackend(ModelBackend):
def authenticate(self, request, access_token=None, id_token=None, nonce=None, provider=None):
logger = logging.getLogger(__name__)
if None in (id_token, provider):
return
original_id_token = id_token
try:
id_token = utils.IDToken(id_token)
id_token.deserialize(provider)
except utils.IDTokenError as e:
logger.warning(u'auth_oidc: invalid id_token %s: %s', original_id_token, e)
return None
try:
provider = utils.get_provider_by_issuer(id_token.iss)
except models.OIDCProvider.DoesNotExist:
logger.warning(u'auth_oidc: unknown issuer "%s"', id_token.iss)
return None
key_or_keyset = None
if provider.idtoken_algo == models.OIDCProvider.ALGO_RSA:
key_or_keyset = provider.jwkset
if not key_or_keyset:
logger.warning('auth_oidc: idtoken signature algorithm is RSA but '
'no JWKSet is defined on provider %s', id_token.iss)
return None
algs = ['RS256', 'RS384', 'RS512']
elif provider.idtoken_algo == models.OIDCProvider.ALGO_HMAC:
k = base64url_encode(provider.client_secret.encode('utf-8'))
key_or_keyset = JWK(kty='oct', k=k.decode('ascii'))
if not provider.client_secret:
logger.warning('auth_oidc: idtoken signature algorithm is HMAC but '
'no client_secret is defined on provider %s', id_token.iss)
return None
algs = ['HS256', 'HS384', 'HS512']
elif provider.idtoken_algo == models.OIDCProvider.ALGO_EC:
key_or_keyset = provider.jwkset
if not key_or_keyset:
logger.warning('auth_oidc: idtoken signature algorithm is EC but '
'no JWKSet is defined on provider %s', id_token.iss)
return None
algs = ['ES256', 'ES384', 'ES512']
if key_or_keyset:
jwt = JWT(jwt=original_id_token,
key=key_or_keyset,
check_claims={}, algs=algs)
jwt.claims
if isinstance(id_token.aud, six.text_type) and provider.client_id != id_token.aud:
logger.warning(u'auth_oidc: invalid id_token audience %s != provider client_id %s',
id_token.aud, provider.client_id)
return None
if isinstance(id_token.aud, list):
if provider.client_id not in id_token.aud:
logger.warning(u'auth_oidc: invalid id_token audience %s != provider client_id %s',
id_token.aud, provider.client_id)
return None
if len(id_token.aud) > 1 and 'azp' not in id_token:
logger.warning(u'auth_oidc: multiple audience and azp not set',
id_token.aud, provider.client_id)
return None
if id_token.azp != provider.client_id:
logger.warning(u'auth_oidc: multiple audience and azp %r does not match client_id'
' %r',
id_token.azp, provider.client_id)
return None
if provider.max_auth_age:
if not id_token.iat:
logger.warning('auth_oidc: provider configured for fresh authentication but iat is '
'missing from idtoken')
return None
duration = now() - id_token.iat
if duration > datetime.timedelta(seconds=provider.max_auth_age):
logger.warning('auth_oidc: authentication is too old %s (%s old)', id_token.iat,
duration)
return None
id_token_nonce = getattr(id_token, 'nonce', None)
if nonce and nonce != id_token_nonce:
logger.warning('auth_oidc: id_token nonce %r != expected nonce %r',
id_token_nonce, nonce)
return None
User = get_user_model()
user = None
if provider.strategy == models.OIDCProvider.STRATEGY_FIND_UUID:
# use the OP sub to find an user by UUUID
# it means OP and RP share the same account base and OP is passing its UUID as sub
try:
user = User.objects.get(uuid=id_token.sub, is_active=True)
except User.DoesNotExist:
pass
else:
logger.info(u'auth_oidc: found user using UUID (=sub) "%s": %s', id_token.sub,
user)
else:
try:
user = User.objects.get(oidc_account__provider=provider,
oidc_account__sub=id_token.sub,
is_active=True)
except User.DoesNotExist:
pass
else:
logger.info(u'auth_oidc: found user using with sub "%s": %s', id_token.sub, user)
need_user_info = False
for claim_mapping in provider.claim_mappings.all():
need_user_info = need_user_info or not claim_mapping.idtoken_claim
user_info = None
if need_user_info:
if not access_token:
logger.warning('auth_oidc: need user info for some claims, but no access token was '
'returned')
return None
try:
response = requests.get(provider.userinfo_endpoint,
headers={
'Authorization': 'Bearer %s' % access_token,
})
response.raise_for_status()
except requests.RequestException as e:
logger.warning(u'auth_oidc: failed to retrieve user info %s', e)
else:
try:
user_info = response.json()
except ValueError as e:
logger.warning(u'auth_oidc: bad JSON in user info response, %s (%r)', e,
response.content)
# check for required claims
for claim_mapping in provider.claim_mappings.all():
claim = claim_mapping.claim
if claim_mapping.required:
if '{{' in claim or '{%' in claim:
logger.warning(u'claim \'%r\' is templated, it cannot be set as required')
elif claim_mapping.idtoken_claim and claim not in id_token:
logger.warning(u'auth_oidc: cannot create user missing required claim %r in '
u'id_token (%r)',
claim, id_token)
return None
elif not user_info or claim not in user_info:
logger.warning(u'auth_oidc: cannot create user missing required claim %r in '
u'user_info (%r)', claim, user_info)
return None
# map claims to attributes or user fields
# mapping is done before eventual creation of user as EMAIL_IS_UNIQUE needs to know if the
# mapping will provide some mail to us
ou_map = {ou.slug: ou for ou in get_ou_model().cached()}
user_ou = provider.ou
save_user = False
mappings = []
context = id_token.as_dict(provider)
if need_user_info:
context.update(user_info or {})
for claim_mapping in provider.claim_mappings.all():
claim = claim_mapping.claim
if claim_mapping.idtoken_claim:
source = id_token
else:
source = user_info
if claim not in source and not ('{{' in claim or '{%' in claim):
continue
verified = False
attribute = claim_mapping.attribute
if '{{' in claim or '{%' in claim:
template = Template(claim)
value = template.render(context=context)
# xxx missing verification logic for templated claims
else:
value = source.get(claim)
if claim_mapping.verified == models.OIDCClaimMapping.VERIFIED_CLAIM:
verified = bool(source.get(claim + '_verified', False))
if attribute == 'ou__slug' and value in ou_map:
user_ou = ou_map[value]
continue
if claim_mapping.verified == models.OIDCClaimMapping.ALWAYS_VERIFIED:
verified = True
mappings.append((attribute, value, verified))
# find en email in mappings
email = None
for attribute, value, verified in mappings:
if attribute == 'email':
email = value
# eventually create a new user or link to an existing one based on email
created = False
linked = False
if not user:
if provider.strategy == models.OIDCProvider.STRATEGY_CREATE:
try:
if app_settings.A2_EMAIL_IS_UNIQUE and email:
user = User.objects.get(email=email)
elif provider.ou and provider.ou.email_is_unique:
user = User.objects.get(ou=provider.ou, email=email)
linked = True
except User.DoesNotExist:
pass
if not user:
user = User.objects.create(ou=provider.ou)
created = True
oidc_account, created = models.OIDCAccount.objects.get_or_create(
provider=provider,
user=user,
defaults={'sub': id_token.sub})
if not created and oidc_account.sub != id_token.sub:
logger.info('auth_oidc: changed user %s sub from %s to %s (issuer %s)',
user, oidc_account.sub, id_token.sub, id_token.iss)
oidc_account.sub = id_token.sub
oidc_account.save()
else:
logger.warning(u'auth_oidc: cannot create user for sub %r as issuer %r does not'
u' allow it', id_token.sub, id_token.iss)
return None
if created:
logger.info(u'auth_oidc: created user %s for sub %s and issuer %s',
user, id_token.sub, id_token.iss)
if linked:
logger.info(u'auth_oidc: linked user %s to sub %s and issuer %s',
user, id_token.sub, id_token.iss)
# legacy attributes
for attribute, value, verified in mappings:
if attribute not in ('username', 'first_name', 'last_name', 'email'):
continue
if getattr(user, attribute) != value:
logger.info(u'auth_oidc: set user %s attribute %s to value %s',
user, attribute, value)
setattr(user, attribute, value)
if attribute == 'email' and verified:
user.email_verified = True
save_user = True
if user.ou != user_ou:
logger.info(u'auth_oidc: set user %s ou to %s',
user, user_ou)
user.ou = user_ou
save_user = True
if any(hooks.call_hooks(
'auth_oidc_backend_modify_user',
user=user, user_info=user_info, access_token=access_token, id_token=id_token, provider=provider)):
save_user = True
if save_user:
user.save()
# new style attributes
for attribute, value, verified in mappings:
if attribute in ('username', 'email'):
continue
if attribute in ('first_name', 'last_name') and not verified:
continue
if verified:
setattr(user.verified_attributes, attribute, value)
else:
setattr(user.attributes, attribute, value)
return user
def get_saml2_authn_context(self):
import lasso
return lasso.SAML2_AUTHN_CONTEXT_PASSWORD_PROTECTED_TRANSPORT