import uuid import logging import json import urlparse import urllib import requests from requests_oauthlib import OAuth2Session from django.views.generic import View, FormView from django.views.generic.detail import SingleObjectMixin from django.http import HttpResponseRedirect, Http404 from django.contrib.auth import authenticate, REDIRECT_FIELD_NAME, get_user_model from django.contrib import messages from django.shortcuts import resolve_url, render from django.utils.translation import ugettext as _ from django.utils.http import is_safe_url, urlencode from django.conf import settings from django.core import signing from django.core.cache import InvalidCacheBackendError, caches from django.core.exceptions import PermissionDenied from django.core.urlresolvers import reverse from django.forms import Form from authentic2 import app_settings as a2_app_settings from authentic2 import utils as a2_utils, hooks, constants from authentic2.a2_rbac.utils import get_default_ou from . import app_settings, models, utils SET_PASSWORD_FORM_CLASS = a2_utils.import_module_or_class( a2_app_settings.A2_REGISTRATION_SET_PASSWORD_FORM_CLASS) class LoggerMixin(object): def __init__(self, *args, **kwargs): self.logger = logging.getLogger(__name__) super(LoggerMixin, *args, **kwargs) try: cache = caches['fc'] except InvalidCacheBackendError: cache = caches['default'] CACHE_TIMEOUT = 60 def ask_authorization(request, scopes, logger): '''Compute an authorize URL for obtaining the given scope''' if not isinstance(scopes, (list, tuple)): scopes = [scopes] redirect_uri = request.build_absolute_uri() state = unicode(uuid.uuid4()) states = request.session.setdefault('fc_states', {}) states[state] = { 'redirect_uri': redirect_uri, } request.session.modified = True params = { 'client_id': app_settings.client_id, 'scope': ' '.join(scopes), 'redirect_uri': redirect_uri, 'response_type': 'code', 'state': state, 'nonce': state, } logger.debug('query string %s', params) url = '{0}?{1}'.format(app_settings.authorize_url, urlencode(params)) logger.debug('redirect to %s', url) response = HttpResponseRedirect(url) response.display_message = False return response def resolve_access_token(authorization_code, redirect_uri, logger): '''Exchange an authorization_code for an access_token''' data = { 'code': authorization_code, 'client_id': app_settings.client_id, 'client_secret': app_settings.client_secret, 'redirect_uri': redirect_uri, 'grant_type': 'authorization_code', } logger.debug('access token request %s', data) try: session = utils.requests_retry_session() response = session.post( app_settings.token_url, data=data, verify=app_settings.verify_certificate, allow_redirects=False, timeout=3) if response.status_code != 200: try: data = response.json() logger.warning(u'oauth2 error on access token retrieval: %r', data) except ValueError: data = {} logger.warning(u'oauth2 error on access token retrieval: %r', response.content) return except requests.exceptions.RequestException as e: logger.warning(u'unable to retrieve access token {}'.format(e)) else: try: response = response.json() logger.debug('token resolved : {}'.format(response)) return response except ValueError: logger.warning( "no JSON object can be decoded from the data received from {} : '{}'".format( app_settings.token_url, response.content)) def access_token_from_request(request, logger): '''Resolve an access token given a request returning from the authorization endpoint. ''' code = request.GET.get('code') state = request.GET.get('state') if not code: return if not state: return states = request.session.get('fc_states', {}) if state not in states: return # there should not be many FC SSO in flight redirect_uri = states[state]['redirect_uri'] return resolve_access_token(code, redirect_uri, logger) ACCESS_GRANT_CODE = 'accessgrantcode' class FcOAuthSessionViewMixin(LoggerMixin): '''Add the OAuth2 dance to a view''' scopes = ['openid', 'profile', 'birth', 'email'] redirect_field_name = REDIRECT_FIELD_NAME in_popup = False token = None def get_in_popup(self): return self.in_popup def redirect_to(self, request, *args, **kwargs): if request.method == 'POST': redirect_to = request.POST.get(self.redirect_field_name, request.GET.get(self.redirect_field_name, '')) else: redirect_to = request.GET.get(self.redirect_field_name, '') if not is_safe_url(url=redirect_to, host=request.get_host()): redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL) return redirect_to def close_popup_redirect(self, request, next_url, *args, **kwargs): '''Show a page to close the current popup and reload the parent window with the return url. ''' return render(request, 'authentic2_auth_fc/close-popup-redirect.html', {'redirect_to': next_url}) def simple_redirect(self, request, next_url, *args, **kwargs): return HttpResponseRedirect(next_url) def redirect(self, request, *args, **kwargs): next_url = kwargs.pop('next_url', None) if next_url is None: next_url = self.redirect_to(request, *args, **kwargs) if self.get_in_popup(): return self.close_popup_redirect(request, next_url, *args, **kwargs) else: return self.simple_redirect(request, next_url, *args, **kwargs) def redirect_and_come_back(self, request, next_url, *args, **kwargs): old_next_url = self.redirect_to(request, *args, **kwargs) here = '{0}?{1}'.format( request.path, urlencode({REDIRECT_FIELD_NAME: old_next_url})) there = '{0}{2}{1}'.format( next_url, urlencode({REDIRECT_FIELD_NAME: here}), '&' if '?' in next_url else '?') return self.redirect(request, next_url=there, *args, **kwargs) def get_scopes(self): if app_settings.scopes: return list(set(['openid'] + app_settings.scopes)) else: return self.scopes def get_ressource(self, url, verify): try: data = self.oauth_session().get(url, verify=verify, allow_redirects=False, timeout=3) data.raise_for_status() except requests.exceptions.RequestException as e: self.logger.warning(u'unable to retrieve ressource from {} due to {}'.format(url, e)) else: try: data = data.json() self.logger.debug('ressource resolved : {}'.format(data)) return data except ValueError: self.logger.warning( "no JSON object can be decoded from the data received from {} : '{}'".format( url, data.content)) def get_user_info(self): return self.get_ressource(app_settings.userinfo_url + '?schema=openid', app_settings.verify_certificate) def get_data(self, scopes=[]): data = dict() if not app_settings.fd_list: return data for scope in scopes: for fd in app_settings.fd_list[scope]: url = fd['url'] if fd['query_dic']: url += '?' + urlencode(fd['query_dic']) d = self.get_ressource(url, app_settings.verify_certificate) if d: data.setdefault(scope, []).append((fd['name'], d)) return data def authorization_error(self, request, *args, **kwargs): if request.method == 'POST': error = request.POST.get('error') else: error = request.GET.get('error') if error == 'access_denied': messages.warning(request, _('You refused the connection.')) self.logger.debug('authorization_error %r', request.GET) return self.redirect(request) def dispatch(self, request, *args, **kwargs): '''Interpret the OAuth authorization dance''' if 'code' in request.GET: self.token = access_token_from_request(request, self.logger) # Token request may not be completly processed and result in no # token if not self.token: messages.warning(request, _('Unable to connect to FranceConnect.')) return self.redirect(request) # The token request may fail, 'error' is then required. # A bad client secret results in error equals to invalid_request # for FC and invalid_client for oidc_provider. if 'error' in self.token: msg = 'token request failed : {}'.format(self.token) self.logger.warning(msg) messages.warning(request, _('Unable to connect to FranceConnect.')) return self.redirect(request) key = app_settings.client_secret if isinstance(key, unicode): key = key.encode('utf-8') self.id_token, error = models.parse_id_token( self.token['id_token'], client_id=app_settings.client_id, client_secret=key) if not self.id_token: self.logger.warning(u'validation of id_token failed: %s', error) messages.warning(request, _('Unable to connect to FranceConnect.')) return self.redirect(request) nonce = self.id_token.get('nonce') states = request.session.get('fc_states', {}) if not nonce or nonce not in states: self.logger.warning(u'invalid nonce in id_token %s, known ones %s', nonce, u', '.join(states.keys())) messages.warning(request, _('Unable to connect to FranceConnect.')) return self.redirect(request) self.logger.debug('fc id_token %s', self.id_token) for key in self.id_token: setattr(self, key, self.id_token[key]) self.oauth_session = lambda: utils.requests_retry_session( session=OAuth2Session( app_settings.client_id, token=self.token)) self.user_info = self.get_user_info() if not self.user_info: msg = 'userinfo resolution failed : {}'.format(self.token) self.logger.warning(msg) messages.warning(request, _('Unable to connect to FranceConnect.')) return self.redirect(request) self.logger.debug('fc user_info %s', self.user_info) self.request.session['fc_id_token'] = self.id_token self.request.session['fc_id_token_raw'] = self.token['id_token'] self.request.session['fc_user_info'] = self.user_info if 'fd_scopes' in request.GET: scopes = request.GET.get('fd_scopes') scopes = scopes.split() self.data = self.get_data(scopes) self.logger.debug('fc data %s', self.data) fc_data = self.request.session.setdefault('fc_data', {}) for scope in self.data: fc_data.setdefault(scope, []).extend(self.data[scope]) self.logger.debug('fc data in session %s', self.request.session['fc_data']) return super(FcOAuthSessionViewMixin, self).dispatch(request, *args, **kwargs) elif 'error' in request.GET: return self.authorization_error(request, *args, **kwargs) else: if 'fd_scopes' in request.GET: scopes = request.GET.get('fd_scopes') scopes = scopes.split() self.scopes.extend(scopes) return ask_authorization(request, self.get_scopes(), self.logger) class PopupViewMixin(object): def get_in_popup(self): return 'popup' in self.request.GET class LoginOrLinkView(PopupViewMixin, FcOAuthSessionViewMixin, View): '''Login with FC, if the FC account is already linked, connect this user, if a user is logged link the user to this account, otherwise display an error message. ''' def update_user_info(self): self.fc_account.user_info = json.dumps(self.user_info) self.fc_account.save() utils.apply_user_info_mappings(self.fc_account.user, self.user_info) self.logger.debug('updating user_info %s', self.fc_account.user_info) def get(self, request, *args, **kwargs): registration = True if 'registration' in request.GET else False '''Request an access grant code and associate it to the current user''' self.service_slug = request.GET.get(constants.SERVICE_FIELD_NAME) if request.user.is_authenticated(): # Prevent to add a link with an FC account already linked with another user. try: fc_account = models.FcAccount.objects.get(sub=self.sub, user__is_active=True) if fc_account.user is not request.user: msg = 'Attempt to link FC account {} already linked with user {}' self.logger.info(msg.format(self.sub, fc_account.user)) messages.error(request, _('The FranceConnect account {} is already' ' linked with another account.').format(fc_account)) return self.redirect(request) except models.FcAccount.DoesNotExist: pass # Prevent to add a link to an user which is already linked to an FC account if request.user.fc_accounts.exists(): self.logger.warning(u'cannot link to sub %s, account is already linked to an ' u'FC account', self.sub) messages.error(request, _('Your account is already linked to a FranceConnect account')) return self.redirect(request) json_token = json.dumps(self.token) self.fc_account, created = models.FcAccount.objects.get_or_create( defaults={'token': json_token}, sub=self.sub, user=request.user) if created: self.logger.info('fc link created sub %s', self.sub) self.update_user_info() data = utils.get_mapped_attributes_flat(request) if 'email' in data: messages.info(request, _('Your FranceConnect account {} with ' 'email {} has been linked.').format(self.fc_account, data['email'])) else: messages.info(request, _('Your FranceConnect account {} ' 'has been linked.').format(self.fc_account)) hooks.call_hooks('event', name='fc-link', user=request.user, sub=self.sub, request=request) else: self.fc_account.token = json_token self.fc_account.save() self.update_user_info() messages.info(request, _('Your local account has been updated.')) return self.redirect(request) default_ou = get_default_ou() email_is_unique = a2_app_settings.A2_EMAIL_IS_UNIQUE or default_ou.email_is_unique user = authenticate(sub=self.sub, user_info=self.user_info, token=self.token) if not user and self.user_info.get('email') and email_is_unique: email = self.user_info['email'] User = get_user_model() qs = User.objects.filter(email=email) if not a2_app_settings.A2_EMAIL_IS_UNIQUE and default_ou.email_is_unique: qs = qs.filter(ou=default_ou) if qs.exists(): # there should not be multiple accounts with the same mail if len(qs) > 1: self.logger.error(u'multiple accounts with the same mail %s, %s', email, list(qs)) # ok we have one account elif len(qs) == 1: user = qs[0] # but does he have already a link to an FC account ? if not user.fc_accounts.exists(): fc_account, created = models.FcAccount.objects.get_or_create( defaults={'token': json.dumps(self.token)}, sub=self.sub, user=user) if created: self.logger.info(u'fc link created sub %s user %s', self.sub, user) hooks.call_hooks('event', name='fc-link', user=user, sub=self.sub, request=request) user = authenticate(sub=self.sub, user_info=self.user_info, token=self.token) else: messages.warning( request, _('Your FranceConnect email address \'%s\' is already used by another ' 'account, so we cannot create an account for you. Please create an ' 'account with another email address then link it to FranceConnect ' 'using your account management page.') % email) return self.redirect(request) if user: a2_utils.login(request, user, 'france-connect', service_slug=self.service_slug) self.fc_account = models.FcAccount.objects.get(sub=self.sub, user=user) self.fc_account.token = json.dumps(self.token) self.fc_account.save(update_fields=['token']) self.update_user_info() self.logger.info('logged in using fc sub %s', self.sub) return self.redirect(request) else: if registration: return self.redirect_and_come_back(request, reverse('fc-registration')) else: messages.info(request, _('If you already have an account, please log in, else ' 'create your account.')) if app_settings.show_button_quick_account_creation: return self.redirect_and_come_back(request, settings.LOGIN_URL) else: return self.redirect_and_come_back(request, '{0}?nofc=1'.format(settings.LOGIN_URL)) class RegistrationView(LoggerMixin, View): def get(self, request, *args, **kwargs): data = utils.get_mapped_attributes_flat(request) data['no_password'] = True if app_settings.auto_register: data['confirm_data'] = 'required' else: data['confirm_data'] = True redirect_to = request.GET.get(REDIRECT_FIELD_NAME, '') if not is_safe_url(url=redirect_to, host=request.get_host()): redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL) # Prevent errors when redirect_to does not contain fc-login-or-link view parsed_redirect_to = urlparse.urlparse(redirect_to) if parsed_redirect_to.path != reverse('fc-login-or-link'): redirect_to = '%s?%s=%s' % ( reverse('fc-login-or-link'), REDIRECT_FIELD_NAME, urllib.quote(redirect_to)) if not 'email' in data: data[REDIRECT_FIELD_NAME] = redirect_to messages.warning(request, _("FranceConnect didn't provide your email address, please do.")) return HttpResponseRedirect("{}?token={}".format(reverse('registration_register'), signing.dumps(data))) data['valid_email'] = False data['franceconnect'] = True activation_url = a2_utils.build_activation_url(request, next_url=redirect_to, **data) return HttpResponseRedirect(activation_url) class UnlinkView(LoggerMixin, FormView): template_name = 'authentic2_auth_fc/unlink.html' def get_success_url(self): url = reverse('account_management') if app_settings.logout_when_unlink: # logout URL can be None if not session exists with FC url = utils.build_logout_url(self.request, next_url=url) or url return url def get_form_class(self): form_class = Form if not self.request.user.has_usable_password(): form_class = SET_PASSWORD_FORM_CLASS return form_class def get_form_kwargs(self, **kwargs): kwargs = super(UnlinkView, self).get_form_kwargs(**kwargs) if not self.request.user.has_usable_password(): kwargs['user'] = self.request.user return kwargs def dispatch(self, request, *args, **kwargs): if not request.user.is_authenticated(): raise PermissionDenied() # We prevent unlinking if the user has no usable password and can't change it # because we assume that the password is the unique other mean of authentication # and unlinking would make the account unreachable. if not request.user.has_usable_password() and not \ a2_app_settings.A2_REGISTRATION_CAN_CHANGE_PASSWORD: # Prevent access to the view. raise Http404 return super(UnlinkView, self).dispatch(request, *args, **kwargs) def form_valid(self, form): if not self.request.user.has_usable_password(): form.save() self.logger.info(u'user %s has set a password', self.request.user) links = models.FcAccount.objects.filter(user=self.request.user) for link in links: self.logger.info(u'user %s unlinked from %s', self.request.user, link) hooks.call_hooks('event', name='fc-unlink', user=self.request.user) messages.info(self.request, _('The link with the FranceConnect account has been deleted.')) links.delete() return super(UnlinkView, self).form_valid(form) def get_context_data(self, **kwargs): context = super(UnlinkView, self).get_context_data(**kwargs) if not self.request.user.has_usable_password(): context['no_password'] = True return context def post(self, request, *args, **kwargs): if 'cancel' in request.POST: return a2_utils.redirect(request, 'account_management') return super(UnlinkView, self).post(request, *args, **kwargs) login_or_link = LoginOrLinkView.as_view() registration = RegistrationView.as_view() unlink = UnlinkView.as_view() class LogoutReturnView(View): def get(self, request, *args, **kwargs): state = request.GET.get('state') request.session.pop('fc_id_token', None) request.session.pop('fc_id_token_raw', None) request.session.pop('fc_user_info', None) request.session.pop('fc_data', None) states = request.session.pop('fc_states', None) next_url = None if states and state in states: next_url = states[state].get('next') if not next_url: next_url = reverse('auth_logout') return HttpResponseRedirect(next_url) logout = LogoutReturnView.as_view()