authentic/src/authentic2_auth_fc/views.py

485 lines
20 KiB
Python

import uuid
import requests
import logging
import json
from requests_oauthlib import OAuth2Session
from django.views.generic import View, FormView
from django.views.generic.detail import SingleObjectMixin
from django.http import HttpResponseRedirect, Http404
from django.contrib.auth import authenticate, REDIRECT_FIELD_NAME
from django.contrib.auth.decorators import user_passes_test
from django.contrib import messages
from django.shortcuts import resolve_url, render
from django.utils.translation import ugettext as _
from django.utils.http import is_safe_url, urlencode
from django.conf import settings
from django.core import signing
from django.core.cache import InvalidCacheBackendError, caches
from django.core.exceptions import PermissionDenied
from django.core.urlresolvers import reverse
from django.forms import Form
from django.conf import settings
from authentic2 import app_settings as a2_app_settings
from authentic2 import utils as a2_utils
from . import app_settings, models, utils
SET_PASSWORD_FORM_CLASS = a2_utils.import_module_or_class(
a2_app_settings.A2_REGISTRATION_SET_PASSWORD_FORM_CLASS)
class LoggerMixin(object):
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
super(LoggerMixin, *args, **kwargs)
try:
cache = caches['fc']
except InvalidCacheBackendError:
cache = caches['default']
CACHE_TIMEOUT = 60
def ask_authorization(request, scopes, logger):
'''Compute an authorize URL for obtaining the given scope'''
if not isinstance(scopes, (list, tuple)):
scopes = [scopes]
redirect_uri = request.build_absolute_uri()
state = unicode(uuid.uuid4())
states = request.session.setdefault('fc_states', {})
states[state] = {
'redirect_uri': redirect_uri,
}
request.session.modified = True
params = {
'client_id': app_settings.client_id,
'scope': ' '.join(scopes),
'redirect_uri': redirect_uri,
'response_type': 'code',
'state': state,
'nonce': state,
}
logger.debug('query string %s', params)
url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params))
logger.debug('redirect to %s', url)
return HttpResponseRedirect(url)
def resolve_access_token(authorization_code, redirect_uri, logger):
'''Exchange an authorization_code for an access_token'''
data = {
'code': authorization_code,
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}
logger.debug('access token request %s', data)
try:
response = requests.post(
app_settings.token_url, data=data,
verify=app_settings.verify_certificate,
allow_redirects=False, timeout=3)
if response.status_code != 200:
try:
data = response.json()
logger.warning(u'oauth2 error on access token retrieval: %r', data)
except ValueError:
data = {}
logger.warning(u'oauth2 error on access token retrieval: %r', response.content)
return
except requests.exceptions.RequestException as e:
logger.error(u'unable to retrieve access token {}'.format(e))
else:
try:
response = response.json()
logger.debug('token resolved : {}'.format(response))
return response
except ValueError:
logger.warning(
"no JSON object can be decoded from the data received from {} : '{}'".format(
app_settings.token_url, response.content))
def access_token_from_request(request, logger):
'''Resolve an access token given a request returning from the authorization
endpoint.
'''
code = request.GET.get('code')
state = request.GET.get('state')
if not code:
return
if not state:
return
states = request.session.get('fc_states', {})
if state not in states:
return
# there should not be many FC SSO in flight
redirect_uri = states[state]['redirect_uri']
return resolve_access_token(code, redirect_uri, logger)
ACCESS_GRANT_CODE = 'accessgrantcode'
class FcOAuthSessionViewMixin(LoggerMixin):
'''Add the OAuth2 dance to a view'''
scopes = ['openid', 'profile', 'birth', 'email']
redirect_field_name = REDIRECT_FIELD_NAME
in_popup = False
token = None
def get_in_popup(self):
return self.in_popup
def redirect_to(self, request, *args, **kwargs):
redirect_to = request.REQUEST.get(self.redirect_field_name, '')
if not is_safe_url(url=redirect_to, host=request.get_host()):
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
return redirect_to
def close_popup_redirect(self, request, next_url, *args, **kwargs):
'''Show a page to close the current popup and reload the parent window
with the return url.
'''
return render(request, 'authentic2_auth_fc/close-popup-redirect.html',
{'redirect_to': next_url})
def simple_redirect(self, request, next_url, *args, **kwargs):
return HttpResponseRedirect(next_url)
def redirect(self, request, *args, **kwargs):
next_url = kwargs.pop('next_url', None)
if next_url is None:
next_url = self.redirect_to(request, *args, **kwargs)
if self.get_in_popup():
return self.close_popup_redirect(request, next_url, *args,
**kwargs)
else:
return self.simple_redirect(request, next_url, *args, **kwargs)
def redirect_and_come_back(self, request, next_url, *args, **kwargs):
old_next_url = self.redirect_to(request, *args, **kwargs)
here = '{0}?{1}'.format(
request.path, urlencode({REDIRECT_FIELD_NAME: old_next_url}))
there = '{0}{2}{1}'.format(
next_url, urlencode({REDIRECT_FIELD_NAME: here}),
'&' if '?' in next_url else '?')
return self.redirect(request, next_url=there, *args, **kwargs)
def get_scopes(self):
if app_settings.scopes:
return list(set(['openid'] + app_settings.scopes))
else:
return self.scopes
def get_ressource(self, url, verify):
try:
data = self.oauth_session().get(url, verify=verify, allow_redirects=False, timeout=3)
data.raise_for_status()
except requests.exceptions.RequestException as e:
self.logger.error(u'unable to retrieve ressource from {} due to {}'.format(url, e))
else:
try:
data = data.json()
self.logger.debug('ressource resolved : {}'.format(data))
return data
except ValueError:
self.logger.warning(
"no JSON object can be decoded from the data received from {} : '{}'".format(
url, data.content))
def get_user_info(self):
return self.get_ressource(app_settings.userinfo_url + '?schema=openid',
app_settings.verify_certificate)
def get_data(self, scopes=[]):
data = dict()
if not app_settings.fd_list:
return data
for scope in scopes:
for fd in app_settings.fd_list[scope]:
url = fd['url']
if fd['query_dic']:
url += '?' + urlencode(fd['query_dic'])
d = self.get_ressource(url, app_settings.verify_certificate)
if d:
data.setdefault(scope, []).append((fd['name'], d))
return data
def authorization_error(self, request, *args, **kwargs):
if request.REQUEST['error'] == 'access_denied':
messages.warning(request, _('You refused the connection.'))
self.logger.debug('authorization_error %r', request.GET)
return self.redirect(request)
def dispatch(self, request, *args, **kwargs):
'''Interpret the OAuth authorization dance'''
if 'code' in request.GET:
self.token = access_token_from_request(request, self.logger)
# Token request may not be completly processed and result in no
# token
if not self.token:
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
# The token request may fail, 'error' is then required.
# A bad client secret results in error equals to invalid_request
# for FC and invalid_client for oidc_provider.
if 'error' in self.token:
msg = 'token request failed : {}'.format(self.token)
self.logger.warning(msg)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
key = app_settings.client_secret
if isinstance(key, unicode):
key = key.encode('utf-8')
self.id_token, error = models.parse_id_token(
self.token['id_token'], client_id=app_settings.client_id, client_secret=key)
if not self.id_token:
self.logger.warning(u'validation of id_token failed: %s', error)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
nonce = self.id_token.get('nonce')
states = request.session.get('fc_states', {})
if not nonce or nonce not in states:
self.logger.warning(u'invalid nonce in id_token %s, known ones %s', nonce,
u', '.join(states.keys()))
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc id_token %s', self.id_token)
for key in self.id_token:
setattr(self, key, self.id_token[key])
self.oauth_session = lambda: OAuth2Session(
app_settings.client_id, token=self.token)
self.user_info = self.get_user_info()
if not self.user_info:
msg = 'userinfo resolution failed : {}'.format(self.token)
self.logger.warning(msg)
messages.warning(request, _('Unable to connect to FranceConnect.'))
return self.redirect(request)
self.logger.debug('fc user_info %s', self.user_info)
self.request.session['fc_id_token'] = self.id_token
self.request.session['fc_id_token_raw'] = self.token['id_token']
self.request.session['fc_user_info'] = self.user_info
if 'fd_scopes' in request.GET:
scopes = request.GET.get('fd_scopes')
scopes = scopes.split()
self.data = self.get_data(scopes)
self.logger.debug('fc data %s', self.data)
fc_data = self.request.session.setdefault('fc_data', {})
for scope in self.data:
fc_data.setdefault(scope, []).extend(self.data[scope])
self.logger.debug('fc data in session %s', self.request.session['fc_data'])
return super(FcOAuthSessionViewMixin, self).dispatch(request,
*args,
**kwargs)
elif 'error' in request.GET:
return self.authorization_error(request, *args, **kwargs)
else:
if 'fd_scopes' in request.GET:
scopes = request.GET.get('fd_scopes')
scopes = scopes.split()
self.scopes.extend(scopes)
return ask_authorization(request, self.get_scopes(), self.logger)
class PopupViewMixin(object):
def get_in_popup(self):
return 'popup' in self.request.REQUEST
class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View):
'''Login with FC, if the FC account is already linked, connect this user,
if a user is logged link the user to this account, otherwise display an
error message.
'''
def update_user_info(self):
self.fc_account.user_info = json.dumps(self.user_info)
self.fc_account.save()
utils.apply_user_info_mappings(self.fc_account.user, self.user_info)
self.logger.debug('updating user_info %s', self.fc_account.user_info)
def get(self, request, *args, **kwargs):
registration = True if 'registration' in request.GET else False
'''Request an access grant code and associate it to the current user'''
if request.user.is_authenticated():
# Prevent to add a link with an FC account already linked with another user.
try:
fc_account = models.FcAccount.objects.get(sub=self.sub, user__is_active=True)
if fc_account.user is not request.user:
msg = 'Attempt to link FC account {} already linked with user {}'
self.logger.info(msg.format(self.sub, fc_account.user))
messages.info(request,
_('The FranceConnect account {} is already'
' linked with another account.').format(fc_account))
return self.redirect(request)
except models.FcAccount.DoesNotExist:
pass
# Old link are deleted
json_token = json.dumps(self.token)
self.fc_account, created = models.FcAccount.objects.get_or_create(
defaults={'token': json_token},
sub=self.sub, user=request.user)
if created:
self.logger.info('fc link created sub %s', self.sub)
self.update_user_info()
data = utils.get_mapped_attributes_flat(request)
if 'email' in data:
messages.info(request,
_('Your FranceConnect account {} with '
'email {} has been linked.').format(self.fc_account,
data['email']))
else:
messages.info(request, _('Your FranceConnect account {} '
'has been linked.').format(self.fc_account))
else:
self.fc_account.token = json_token
self.fc_account.save()
self.update_user_info()
messages.info(request, _('Your local account has been updated.'))
return self.redirect(request)
user = authenticate(sub=self.sub, user_info=self.user_info,
token=self.token)
if user:
a2_utils.login(request, user, 'france-connect')
self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user)
self.update_user_info()
self.logger.info('logged in using fc sub %s', self.sub)
return self.redirect(request)
else:
if registration:
return self.redirect_and_come_back(request, reverse('fc-registration'))
else:
messages.info(request, _('If you already have an account, please log in, else '
'create your account.'))
if app_settings.show_button_quick_account_creation:
return self.redirect_and_come_back(request, settings.LOGIN_URL)
else:
return self.redirect_and_come_back(request,
'{0}?nofc=1'.format(settings.LOGIN_URL))
class RegistrationView(LoggerMixin, View):
def get(self, request, *args, **kwargs):
data = utils.get_mapped_attributes_flat(request)
data['no_password'] = True
if app_settings.auto_register:
data['confirm_data'] = 'required'
else:
data['confirm_data'] = True
redirect_to = request.REQUEST.get(REDIRECT_FIELD_NAME, '')
if not is_safe_url(url=redirect_to, host=request.get_host()):
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
if not 'email' in data:
data[REDIRECT_FIELD_NAME] = redirect_to
messages.info(request, _("FranceConnect didn't provide your email address, please do."))
return HttpResponseRedirect("{}?token={}".format(reverse('registration_register'),
signing.dumps(data)))
data['valid_email'] = False
data['franceconnect'] = True
activation_url = \
a2_utils.build_activation_url(request,
next_url=redirect_to,
**data)
return HttpResponseRedirect(activation_url)
class UnlinkView(LoggerMixin, SingleObjectMixin, FormView):
model = models.FcAccount
template_name = 'authentic2_auth_fc/unlink.html'
def get_success_url(self):
url = reverse('account_management')
if app_settings.logout_when_unlink:
url = utils.build_logout_url(self.request, next_url=url)
return url
def get_form_class(self):
form_class = Form
if not self.fc_account.user.has_usable_password():
form_class = SET_PASSWORD_FORM_CLASS
return form_class
def get_form_kwargs(self, **kwargs):
kwargs = super(UnlinkView, self).get_form_kwargs(**kwargs)
if not self.fc_account.user.has_usable_password():
kwargs['user'] = self.fc_account.user
return kwargs
def dispatch(self, request, *args, **kwargs):
# We prevent unlinking if the user has no usable password and can't change it
# because we assume that the password is the unique other mean of authentication
# and unlinking would make the account unreachable.
if not request.user.has_usable_password() and not \
a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD:
# Prevent access to the view.
raise Http404
self.fc_account = self.object = self.get_object()
self.check_access(self.fc_account)
return super(UnlinkView, self).dispatch(request, *args, **kwargs)
def check_access(self, fc_account):
if self.request.user != fc_account.user:
raise PermissionDenied
def form_valid(self, form):
if not self.fc_account.user.has_usable_password():
form.save()
self.logger.info(u'user %s has set a password', self.fc_account.user)
self.fc_account.user.backend = 'authentic2.backends.models_backend.ModelBackend'
msg_tpl = _('The link with the FranceConnect account {fc_account} has been deleted.')
msg = msg_tpl.format(fc_account=self.fc_account)
self.logger.info(u'user %s unlinked from %s', self.fc_account.user, self.fc_account)
self.fc_account.delete()
messages.info(self.request, msg)
return super(UnlinkView, self).form_valid(form)
def get_context_data(self, **kwargs):
context = super(UnlinkView, self).get_context_data(**kwargs)
context['fc_account'] = self.fc_account
if not self.fc_account.user.has_usable_password():
context['no_password'] = True
return context
def post(self, request, *args, **kwargs):
if 'cancel' in request.POST:
return a2_utils.redirect(request, 'account_management')
return super(UnlinkView, self).post(request, *args, **kwargs)
login_or_link = LoginOrLinkView.as_view()
registration = RegistrationView.as_view()
unlink = UnlinkView.as_view()
class LogoutReturnView(View):
def get(self, request, *args, **kwargs):
state = request.GET.get('state')
request.session.pop('fc_id_token', None)
request.session.pop('fc_id_token_raw', None)
request.session.pop('fc_user_info', None)
request.session.pop('fc_data', None)
states = request.session.pop('fc_states', None)
next_url = None
if states and state in states:
next_url = states[state].get('next')
if not next_url:
next_url = reverse('auth_logout')
return HttpResponseRedirect(next_url)
logout = LogoutReturnView.as_view()