This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
portail-citoyen/portail_citoyen/apps/msp/views.py

480 lines
18 KiB
Python

import urllib
import requests
import logging
import urlparse
import base64
import uuid
import json
from requests_oauthlib import OAuth2Session
from django.core.urlresolvers import reverse
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import View, RedirectView
from django.views.generic.base import TemplateView
from django.http import HttpResponse, HttpResponseRedirect
from django.contrib.auth import authenticate, login as auth_login, REDIRECT_FIELD_NAME
from django.contrib.auth.views import redirect_to_login
from django.contrib.auth.decorators import login_required, user_passes_test
from django.contrib import messages
from django.shortcuts import resolve_url, render
from django.utils.translation import ugettext as _
from django.utils.http import is_safe_url
from django.conf import settings
from django.core.cache import InvalidCacheBackendError, get_cache
from django.views.decorators.clickjacking import xframe_options_exempt
from . import app_settings, models
def user_has_mspaccount(user):
'''Return True if user a link to MSP'''
try:
return user.mspaccount is not None
except models.MspAccount.DoesNotExist:
return False
mspaccount_required = user_passes_test(user_has_mspaccount, '/')
logger = logging.getLogger(__name__)
try:
cache = get_cache('msp')
except InvalidCacheBackendError:
cache = get_cache('default')
CACHE_TIMEOUT = 60
def ask_authorization(request, scopes):
'''Compute an authorize URL for obtaining the given scope'''
if not isinstance(scopes, (list, tuple)):
scopes = [scopes]
redirect_uri = request.build_absolute_uri()
params = {
'client_id': app_settings.client_id,
'scope': ' '.join(scopes),
'redirect_uri': redirect_uri,
'response_type': 'code',
'state': base64.b64encode(redirect_uri),
}
url = '{0}?{1}'.format(app_settings.authorize_url,
urllib.urlencode(params))
return HttpResponseRedirect(url)
def resolve_access_token(authorization_code, redirect_uri):
data = {
'code': authorization_code,
'client_id': app_settings.client_id,
'client_secret': app_settings.client_secret,
'redirect_uri': redirect_uri,
'grant_type': 'authorization_code',
}
response = requests.post(app_settings.token_url,
data=data, verify=app_settings.verify_certificate,
cert=app_settings.client_certificate)
return response.json()
def access_token_from_request(request):
'''Resolve an access token given a request returning from the authorization
endpoint.
'''
authorization_code = request.GET['code']
if 'state' in request.GET:
state = request.GET.get('state')
redirect_uri = base64.b64decode(state)
else:
redirect_uri = request.GET['redirect_uri']
return resolve_access_token(authorization_code, redirect_uri)
ACCESS_GRANT_CODE = 'accessgrantcode'
class MspOAuthSessionViewMixin(object):
scopes = []
redirect_field_name = REDIRECT_FIELD_NAME
in_popup = False
token = None
def get_in_popup(self):
return self.in_popup
def redirect_to(self, request, *args, **kwargs):
redirect_to = request.REQUEST.get(self.redirect_field_name, '')
if not is_safe_url(url=redirect_to, host=request.get_host()):
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
return redirect_to
def close_popup_redirect(self, request, next_url, *args, **kwargs):
'''Show a page to close the current popup and reload the parent window
with the return url.
'''
return render(request, 'msp/close-popup-redirect.html', {'redirect_to': next_url})
def simple_redirect(self, request, next_url, *args, **kwargs):
return HttpResponseRedirect(next_url)
def redirect(self, request, *args, **kwargs):
next_url = kwargs.pop('next_url', None)
if next_url is None:
next_url = self.redirect_to(request, *args, **kwargs)
if self.get_in_popup():
return self.close_popup_redirect(request, next_url, *args, **kwargs)
else:
return self.simple_redirect(request, next_url, *args, **kwargs)
def redirect_and_come_back(self, request, next_url, *args, **kwargs):
old_next_url = self.redirect_to(request, *args, **kwargs)
here = '{0}?{1}'.format(request.path,
urllib.urlencode({REDIRECT_FIELD_NAME: old_next_url}))
there = '{0}{2}{1}'.format(next_url,
urllib.urlencode({REDIRECT_FIELD_NAME: here}),
'&' if '?' in next_url else '?')
return self.redirect(request, next_url=there, *args, **kwargs)
def get_scopes(self):
return self.scopes
def api_call(self, api_path, method='get', **kwargs):
url = urlparse.urljoin(app_settings.api_url, api_path)
return getattr(self.oauth_session(), method)(url,
verify=app_settings.verify_certificate,
cert=app_settings.client_certificate, **kwargs)
def authorization_error(self, request, *args, **kwargs):
if request.REQUEST['error'] == 'access_denied':
messages.warning(request, _('request denied by user'))
logger.debug('authorization_error %r', request.GET)
return self.redirect(request)
def dispatch(self, request, *args, **kwargs):
'''Interpret the OAuth authorization dance'''
if 'code' in request.GET:
self.token = access_token_from_request(request)
self.oauth_session = lambda: OAuth2Session(
app_settings.client_id, token=self.token)
return super(MspOAuthSessionViewMixin, self).dispatch(request, *args, **kwargs)
elif 'error' in request.GET:
return self.authorization_error(request, *args, **kwargs)
else:
return ask_authorization(request, self.get_scopes())
class PopupViewMixin(object):
def get_in_popup(self):
return 'popup' in self.request.REQUEST
class LoginView(PopupViewMixin, MspOAuthSessionViewMixin, View):
scopes = [ 'GET_AGC', 'DELETE_AGC' ]
def dispatch(self, request, *args, **kwargs):
if request.user.is_authenticated():
return self.redirect(request)
return super(LoginView, self).dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
result = self.api_call('app/rest/agc').json()
agc = result[ACCESS_GRANT_CODE]
user = authenticate(agc=agc)
if user:
auth_login(request, user)
else:
messages.warning(request, _('no local account is linked to your MSP account'))
return self.redirect(request)
login = LoginView.as_view()
class LinkView(PopupViewMixin, MspOAuthSessionViewMixin, View):
scopes = [ 'GET_AGC', 'DELETE_AGC' ]
def dispatch(self, request, *args, **kwargs):
'''Login user between authorization and access grant code request'''
if 'code' in request.GET and not request.user.is_authenticated():
return redirect_to_login(request.get_full_path())
return super(LinkView, self).dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
'''Request an access grant code and associate it to the current user'''
result = self.api_call('app/rest/agc').json()
agc = result[ACCESS_GRANT_CODE]
try:
models.MspAccount.objects.get(agc=agc)
messages.info(request, _('msp link already exists'))
except models.MspAccount.DoesNotExist:
models.MspAccount.objects.create(user=request.user,
agc=agc, token=json.dumps(self.token))
messages.info(request, _('msp link created'))
return self.redirect(request)
link = LinkView.as_view()
class UnlinkView(PopupViewMixin, MspOAuthSessionViewMixin, TemplateView):
scopes = [ 'DELETE_AGC' ]
def get(self, request, *args, **kwargs):
qs = models.MspAccount.objects.filter(user=request.user)
if qs.exists():
result = self.api_call('app/rest/agc', method='delete')
if result.status_code // 100 == 2:
qs.delete()
logger.debug('all link deleted for %r', request.user)
next_url = self.redirect_to(request, *args, **kwargs)
msp_unlink_url = reverse('msp-unlink-done')
next_url = '{0}?{1}'.format(msp_unlink_url,
urllib.urlencode({REDIRECT_FIELD_NAME: next_url}))
return self.redirect(request, next_url=next_url)
else:
logger.debug('error when deleting msp link %r', result.content)
else:
logger.debug('no msp account exist doing nothing')
return self.redirect(request)
unlink = login_required(UnlinkView.as_view())
class AuthorizeView(View):
def get(self, request, *args, **kwargs):
GET = request.GET
if 'code' in GET:
assert 'state' in GET
state = GET['state']
saved = cache.get('msp-state-' + state)
redirect_uri = saved['redirect_uri']
old_state = saved['state']
new_code = str(uuid.uuid4())
cache.set('msp-code-' + new_code, {
'msp_code': GET['code'],
'msp_redirect_uri': request.build_absolute_uri(request.path),
'redirect_uri': redirect_uri,
'scope': saved['scope'],
}, CACHE_TIMEOUT)
parsed = urlparse.urlparse(redirect_uri)
params = urlparse.parse_qs(parsed.query)
params.update(dict(GET))
params['code'] = new_code
if old_state:
params['state'] = old_state
else:
del params['state']
parsed = parsed._replace(query=urllib.urlencode(params, True))
url = urlparse.urlunparse(parsed)
return HttpResponseRedirect(url)
if 'error' in GET:
assert 'state' in GET
state = GET['state']
saved = cache.get('msp-state-' + state)
old_state = saved['state']
redirect_uri = saved['redirect_uri']
parsed = urlparse.urlparse(redirect_uri)
params = urlparse.parse_qs(parsed.query)
params.update(dict(GET))
if old_state:
params['state'] = old_state
else:
del params['state']
parsed = parsed._replace(query=urllib.urlencode(params, True))
url = urlparse.urlunparse(parsed)
return HttpResponseRedirect(url)
else:
assert 'redirect_uri' in GET
assert 'scope' in GET
assert 'response_type' in GET
assert GET['response_type'] == 'code'
redirect_uri = request.build_absolute_uri(request.path)
state = str(uuid.uuid4())
logger.debug('%s: received MSP authorization demande %r', state, GET)
# save state
cache.set('msp-state-' + state, {
'msp_redirect_uri': redirect_uri,
'scope': GET['scope'].split(),
'redirect_uri': GET['redirect_uri'],
'state': GET.get('state')
}, CACHE_TIMEOUT)
params = dict(GET)
params['state'] = state
params['client_id'] = app_settings.client_id
params['redirect_uri'] = redirect_uri
url = '{0}?{1}'.format(app_settings.authorize_url,
urllib.urlencode(params, True))
return HttpResponseRedirect(url)
authorize = xframe_options_exempt(AuthorizeView.as_view())
class AccessToken(View):
def post(self, request, *args, **kwargs):
try:
return self.access_token(request, *args, **kwargs)
except AssertionError, e:
raise
result = {
'error': 'internal_error',
'description': repr(e),
}
return HttpResponse(json.dumps(result),
content_type='application/json')
def access_token(self, request, *args, **kwargs):
assert 'code' in request.POST
assert 'client_id' in request.POST
assert 'client_secret' in request.POST
assert 'redirect_uri' in request.POST
assert 'grant_type' in request.POST
assert request.POST['grant_type'] == 'authorization_code'
client_id = request.POST['client_id']
client_secret = request.POST['client_secret']
assert (client_id, client_secret) in app_settings.client_credentials
code = request.POST['code']
redirect_uri = request.POST['redirect_uri']
state = cache.get('msp-code-' + code)
assert state is not None
assert 'redirect_uri' in state
assert state['redirect_uri'] == redirect_uri
result = resolve_access_token(state['msp_code'], state['msp_redirect_uri'])
if 'access_token' in result:
token = str(uuid.uuid4())
cache.set('msp-access-token-' + token, {
'token': result,
'scope': state['scope'],
}, CACHE_TIMEOUT)
new_result = {
'access_token': base64.b64encode(token),
'token_type': 'Bearer',
}
if 'expires_in' in result:
new_result['expires_in'] = result['expires_in']
result = new_result
return HttpResponse(json.dumps(result),
content_type='application/json')
access_token = csrf_exempt(AccessToken.as_view())
class BearerTokenUnauthorized(HttpResponse):
status_code = 401
def __init__(self, error=None):
super(BearerTokenUnauthorized, self).__init__('Unauthorized')
authenticate = 'Bearer'
if error:
authenticate += ' error="%s"' % error
self['WWW-Authenticate'] = authenticate
class OAuth2ProxyView(View):
def dispatch(self, request, *args, **kwargs):
# enforce Bearer authentication
authorization = request.META.get('HTTP_AUTHORIZATION')
if not authorization:
return BearerTokenUnauthorized()
try:
l = authorization.split()
assert len(l) == 2
assert l[0] == 'Bearer'
access_token = base64.b64decode(l[1])
state = cache.get('msp-access-token-' + access_token)
if not (set(self.scopes) <= set(state['scope'])):
return BearerTokenUnauthorized('insufficient_scope')
assert 'token' in state
self.oauth_session = OAuth2Session(app_settings.client_id,
token=state['token'])
return super(OAuth2ProxyView, self).dispatch(request, *args, **kwargs)
except (AssertionError, ValueError):
return BearerTokenUnauthorized('invalid_token')
def api_call(self, api_path, method='get', **kwargs):
url = urlparse.urljoin(app_settings.api_url, api_path)
return getattr(self.oauth_session, method)(url,
verify=app_settings.verify_certificate,
cert=app_settings.client_certificate, **kwargs)
class DocumentsView(OAuth2ProxyView):
scopes = [ 'LIST_DOCS' ]
def get(self, request, *args, **kwargs):
result = self.api_call('app/rest/documents')
return HttpResponse(result, content_type='application/json')
documents = DocumentsView.as_view()
class DocumentView(OAuth2ProxyView):
scopes = [ 'GET_DOC' ]
def get(self, request, *args, **kwargs):
result = self.api_call('app/rest/documents/{doc_id}'.format(doc_id=kwargs['doc_id']))
return HttpResponse(result, content_type='application/json')
document = DocumentView.as_view()
class LoginOrLinkView(PopupViewMixin, MspOAuthSessionViewMixin, View):
scopes = [ 'GET_AGC', 'DELETE_AGC' ]
def get(self, request, *args, **kwargs):
'''Request an access grant code and associate it to the current user'''
result = self.api_call('app/rest/agc').json()
agc = result[ACCESS_GRANT_CODE]
user = authenticate(agc=agc)
if user:
auth_login(request, user)
return self.redirect(request)
elif request.user.is_authenticated():
msp_account, created = models.MspAccount.objects.get_or_create(defaults={
'agc': agc,
'token': json.dumps(self.token),
}, user=request.user)
if not created:
# Previous links is replaced
# XXX: we could add support for multi-link by changing the
# unicity constraint tuple from (user,) to (user, agc).
msp_account.agc = agc
msp_account.token = json.dumps(self.token)
msp_account.save()
messages.info(request, _('msp link created'))
return self.redirect(request)
else:
messages.info(request, _('to create a link with msp, please authenticate'))
return self.redirect_and_come_back(request, '{0}?nomsp=1'.format(settings.LOGIN_URL))
login_or_link = LoginOrLinkView.as_view()
class TemplateWithNextUrlView(TemplateView):
def get_context_data(self, **kwargs):
ctx = super(TemplateWithNextUrlView, self).get_context_data(**kwargs)
ctx['next'] = self.request.GET.get('next') or \
getattr(settings, 'LOGIN_REDIRECT_URL', '/')
return ctx
class LinkManagementView(TemplateWithNextUrlView):
template_name = 'msp/link_management.html'
link_management = mspaccount_required(LinkManagementView.as_view())
link_management = login_required(link_management)
class ConfirmUnlinkView(TemplateWithNextUrlView):
template_name = 'msp/confirm_unlink.html'
def get_context_data(self, **kwargs):
ctx = super(ConfirmUnlinkView, self).get_context_data(**kwargs)
ctx['popup'] = True
return ctx
confirm_unlink = mspaccount_required(ConfirmUnlinkView.as_view())
confirm_unlink = login_required(confirm_unlink)
class UnlinkDoneView(TemplateWithNextUrlView):
template_name = 'msp/unlink_done.html'
unlink_done = UnlinkDoneView.as_view()
class MoreRedirectView(RedirectView):
def get_redirect_url(self, *args, **kwargs):
return app_settings.more_url
more_redirect = MoreRedirectView.as_view()