466 lines
18 KiB
Python
466 lines
18 KiB
Python
import urllib
|
|
import requests
|
|
import logging
|
|
import urlparse
|
|
import base64
|
|
import uuid
|
|
import json
|
|
|
|
|
|
from requests_oauthlib import OAuth2Session
|
|
|
|
from django.core.urlresolvers import reverse
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.generic import View, RedirectView
|
|
from django.views.generic.base import TemplateView
|
|
from django.http import HttpResponse, HttpResponseRedirect
|
|
from django.contrib.auth import authenticate, login as auth_login, REDIRECT_FIELD_NAME
|
|
from django.contrib.auth.views import redirect_to_login
|
|
from django.contrib.auth.decorators import login_required, user_passes_test
|
|
from django.contrib import messages
|
|
from django.shortcuts import resolve_url, render
|
|
from django.utils.translation import ugettext as _
|
|
from django.utils.http import is_safe_url
|
|
from django.conf import settings
|
|
from django.core.cache import InvalidCacheBackendError, get_cache
|
|
from django.views.decorators.clickjacking import xframe_options_exempt
|
|
|
|
from . import app_settings, models
|
|
|
|
def user_has_mspaccount(user):
|
|
'''Return True if user a link to MSP'''
|
|
try:
|
|
return user.mspaccount is not None
|
|
except models.MspAccount.DoesNotExist:
|
|
return False
|
|
|
|
mspaccount_required = user_passes_test(user_has_mspaccount, '/')
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
cache = get_cache('msp')
|
|
except InvalidCacheBackendError:
|
|
cache = get_cache('default')
|
|
|
|
CACHE_TIMEOUT = 60
|
|
|
|
def ask_authorization(request, scopes):
|
|
'''Compute an authorize URL for obtaining the given scope'''
|
|
if not isinstance(scopes, (list, tuple)):
|
|
scopes = [scopes]
|
|
redirect_uri = request.build_absolute_uri()
|
|
params = {
|
|
'client_id': app_settings.client_id,
|
|
'scope': ' '.join(scopes),
|
|
'redirect_uri': redirect_uri,
|
|
'response_type': 'code',
|
|
'state': base64.b64encode(redirect_uri),
|
|
}
|
|
url = '{0}?{1}'.format(app_settings.authorize_url,
|
|
urllib.urlencode(params))
|
|
return HttpResponseRedirect(url)
|
|
|
|
def resolve_access_token(authorization_code, redirect_uri):
|
|
data = {
|
|
'code': authorization_code,
|
|
'client_id': app_settings.client_id,
|
|
'client_secret': app_settings.client_secret,
|
|
'redirect_uri': redirect_uri,
|
|
'grant_type': 'authorization_code',
|
|
}
|
|
response = requests.post(app_settings.token_url,
|
|
data=data, verify=app_settings.verify_certificate,
|
|
cert=app_settings.client_certificate)
|
|
return response.json()
|
|
|
|
def access_token_from_request(request):
|
|
'''Resolve an access token given a request returning from the authorization
|
|
endpoint.
|
|
'''
|
|
authorization_code = request.GET['code']
|
|
if 'state' in request.GET:
|
|
state = request.GET.get('state')
|
|
redirect_uri = base64.b64decode(state)
|
|
else:
|
|
redirect_uri = request.GET['redirect_uri']
|
|
return resolve_access_token(authorization_code, redirect_uri)
|
|
|
|
ACCESS_GRANT_CODE = 'accessgrantcode'
|
|
|
|
class MspOAuthSessionViewMixin(object):
|
|
scopes = []
|
|
redirect_field_name = REDIRECT_FIELD_NAME
|
|
in_popup = False
|
|
token = None
|
|
|
|
def get_in_popup(self):
|
|
return self.in_popup
|
|
|
|
def redirect_to(self, request, *args, **kwargs):
|
|
redirect_to = request.REQUEST.get(self.redirect_field_name, '')
|
|
if not is_safe_url(url=redirect_to, host=request.get_host()):
|
|
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
|
|
return redirect_to
|
|
|
|
def close_popup_redirect(self, request, next_url, *args, **kwargs):
|
|
'''Show a page to close the current popup and reload the parent window
|
|
with the return url.
|
|
'''
|
|
return render(request, 'msp/close-popup-redirect.html', {'redirect_to': next_url})
|
|
|
|
def simple_redirect(self, request, next_url, *args, **kwargs):
|
|
return HttpResponseRedirect(next_url)
|
|
|
|
def redirect(self, request, *args, **kwargs):
|
|
next_url = kwargs.pop('next_url', None)
|
|
if next_url is None:
|
|
next_url = self.redirect_to(request, *args, **kwargs)
|
|
if self.get_in_popup():
|
|
return self.close_popup_redirect(request, next_url, *args, **kwargs)
|
|
else:
|
|
return self.simple_redirect(request, next_url, *args, **kwargs)
|
|
|
|
def redirect_and_come_back(self, request, next_url, *args, **kwargs):
|
|
old_next_url = self.redirect_to(request, *args, **kwargs)
|
|
here = '{0}?{1}'.format(request.path,
|
|
urllib.urlencode({REDIRECT_FIELD_NAME: old_next_url}))
|
|
there = '{0}{2}{1}'.format(next_url,
|
|
urllib.urlencode({REDIRECT_FIELD_NAME: here}),
|
|
'&' if '?' in next_url else '?')
|
|
return self.redirect(request, next_url=there, *args, **kwargs)
|
|
|
|
def get_scopes(self):
|
|
return self.scopes
|
|
|
|
def api_call(self, api_path, method='get', **kwargs):
|
|
url = urlparse.urljoin(app_settings.api_url, api_path)
|
|
return getattr(self.oauth_session(), method)(url,
|
|
verify=app_settings.verify_certificate,
|
|
cert=app_settings.client_certificate, **kwargs)
|
|
|
|
def authorization_error(self, request, *args, **kwargs):
|
|
if request.REQUEST['error'] == 'access_denied':
|
|
messages.warning(request, _('request denied by user'))
|
|
logger.debug('authorization_error %r', request.GET)
|
|
return self.redirect(request)
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
'''Interpret the OAuth authorization dance'''
|
|
if 'code' in request.GET:
|
|
self.token = access_token_from_request(request)
|
|
self.oauth_session = lambda: OAuth2Session(
|
|
app_settings.client_id, token=self.token)
|
|
return super(MspOAuthSessionViewMixin, self).dispatch(request, *args, **kwargs)
|
|
elif 'error' in request.GET:
|
|
return self.authorization_error(request, *args, **kwargs)
|
|
else:
|
|
return ask_authorization(request, self.get_scopes())
|
|
|
|
class PopupViewMixin(object):
|
|
def get_in_popup(self):
|
|
return 'popup' in self.request.REQUEST
|
|
|
|
class LoginView(PopupViewMixin, MspOAuthSessionViewMixin, View):
|
|
scopes = [ 'GET_AGC', 'DELETE_AGC' ]
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if request.user.is_authenticated():
|
|
return self.redirect(request)
|
|
return super(LoginView, self).dispatch(request, *args, **kwargs)
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
result = self.api_call('app/rest/agc').json()
|
|
agc = result[ACCESS_GRANT_CODE]
|
|
user = authenticate(agc=agc)
|
|
if user:
|
|
auth_login(request, user)
|
|
else:
|
|
messages.warning(request, _('no local account is linked to your MSP account'))
|
|
return self.redirect(request)
|
|
|
|
login = LoginView.as_view()
|
|
|
|
class LinkView(PopupViewMixin, MspOAuthSessionViewMixin, View):
|
|
scopes = [ 'GET_AGC', 'DELETE_AGC' ]
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
'''Login user between authorization and access grant code request'''
|
|
if 'code' in request.GET and not request.user.is_authenticated():
|
|
return redirect_to_login(request.get_full_path())
|
|
return super(LinkView, self).dispatch(request, *args, **kwargs)
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
'''Request an access grant code and associate it to the current user'''
|
|
result = self.api_call('app/rest/agc').json()
|
|
agc = result[ACCESS_GRANT_CODE]
|
|
try:
|
|
models.MspAccount.objects.get(agc=agc)
|
|
messages.info(request, _('msp link already exists'))
|
|
except models.MspAccount.DoesNotExist:
|
|
models.MspAccount.objects.create(user=request.user,
|
|
agc=agc, token=json.dumps(self.token))
|
|
messages.info(request, _('msp link created'))
|
|
return self.redirect(request)
|
|
|
|
link = LinkView.as_view()
|
|
|
|
class UnlinkView(PopupViewMixin, MspOAuthSessionViewMixin, TemplateView):
|
|
scopes = [ 'DELETE_AGC' ]
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
qs = models.MspAccount.objects.filter(user=request.user)
|
|
if qs.exists():
|
|
result = self.api_call('app/rest/agc', method='delete')
|
|
if result.status_code // 100 == 2:
|
|
qs.delete()
|
|
logger.debug('all link deleted for %r', request.user)
|
|
next_url = self.redirect_to(request, *args, **kwargs)
|
|
msp_unlink_url = reverse('msp-unlink-done')
|
|
next_url = '{0}?{1}'.format(msp_unlink_url,
|
|
urllib.urlencode({REDIRECT_FIELD_NAME: next_url}))
|
|
return self.redirect(request, next_url=next_url)
|
|
else:
|
|
logger.debug('error when deleting msp link %r', result.content)
|
|
else:
|
|
logger.debug('no msp account exist doing nothing')
|
|
return self.redirect(request)
|
|
|
|
unlink = login_required(UnlinkView.as_view())
|
|
|
|
class AuthorizeView(View):
|
|
def get(self, request, *args, **kwargs):
|
|
GET = request.GET
|
|
if 'code' in GET:
|
|
assert 'state' in GET
|
|
state = GET['state']
|
|
saved = cache.get('msp-state-' + state)
|
|
redirect_uri = saved['redirect_uri']
|
|
old_state = saved['state']
|
|
new_code = str(uuid.uuid4())
|
|
cache.set('msp-code-' + new_code, {
|
|
'msp_code': GET['code'],
|
|
'msp_redirect_uri': request.build_absolute_uri(request.path),
|
|
'redirect_uri': redirect_uri,
|
|
'scope': saved['scope'],
|
|
}, CACHE_TIMEOUT)
|
|
parsed = urlparse.urlparse(redirect_uri)
|
|
params = urlparse.parse_qs(parsed.query)
|
|
params.update(dict(GET))
|
|
params['code'] = new_code
|
|
if old_state:
|
|
params['state'] = old_state
|
|
else:
|
|
del params['state']
|
|
parsed = parsed._replace(query=urllib.urlencode(params, True))
|
|
url = urlparse.urlunparse(parsed)
|
|
return HttpResponseRedirect(url)
|
|
if 'error' in GET:
|
|
assert False
|
|
else:
|
|
assert 'redirect_uri' in GET
|
|
assert 'scope' in GET
|
|
assert 'response_type' in GET
|
|
assert GET['response_type'] == 'code'
|
|
redirect_uri = request.build_absolute_uri(request.path)
|
|
state = str(uuid.uuid4())
|
|
logger.debug('%s: received MSP authorization demande %r', state, GET)
|
|
# save state
|
|
cache.set('msp-state-' + state, {
|
|
'msp_redirect_uri': redirect_uri,
|
|
'scope': GET['scope'].split(),
|
|
'redirect_uri': GET['redirect_uri'],
|
|
'state': GET.get('state')
|
|
}, CACHE_TIMEOUT)
|
|
params = dict(GET)
|
|
params['state'] = state
|
|
params['client_id'] = app_settings.client_id
|
|
params['redirect_uri'] = redirect_uri
|
|
url = '{0}?{1}'.format(app_settings.authorize_url,
|
|
urllib.urlencode(params, True))
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
authorize = xframe_options_exempt(AuthorizeView.as_view())
|
|
|
|
class AccessToken(View):
|
|
def post(self, request, *args, **kwargs):
|
|
try:
|
|
return self.access_token(request, *args, **kwargs)
|
|
except AssertionError, e:
|
|
raise
|
|
result = {
|
|
'error': 'internal_error',
|
|
'description': repr(e),
|
|
}
|
|
return HttpResponse(json.dumps(result),
|
|
content_type='application/json')
|
|
|
|
def access_token(self, request, *args, **kwargs):
|
|
assert 'code' in request.POST
|
|
assert 'client_id' in request.POST
|
|
assert 'client_secret' in request.POST
|
|
assert 'redirect_uri' in request.POST
|
|
assert 'grant_type' in request.POST
|
|
assert request.POST['grant_type'] == 'authorization_code'
|
|
client_id = request.POST['client_id']
|
|
client_secret = request.POST['client_secret']
|
|
assert (client_id, client_secret) in app_settings.client_credentials
|
|
code = request.POST['code']
|
|
redirect_uri = request.POST['redirect_uri']
|
|
state = cache.get('msp-code-' + code)
|
|
assert state is not None
|
|
assert 'redirect_uri' in state
|
|
assert state['redirect_uri'] == redirect_uri
|
|
result = resolve_access_token(state['msp_code'], state['msp_redirect_uri'])
|
|
if 'access_token' in result:
|
|
token = str(uuid.uuid4())
|
|
cache.set('msp-access-token-' + token, {
|
|
'token': result,
|
|
'scope': state['scope'],
|
|
}, CACHE_TIMEOUT)
|
|
new_result = {
|
|
'access_token': base64.b64encode(token),
|
|
'token_type': 'Bearer',
|
|
}
|
|
if 'expires_in' in result:
|
|
new_result['expires_in'] = result['expires_in']
|
|
result = new_result
|
|
return HttpResponse(json.dumps(result),
|
|
content_type='application/json')
|
|
|
|
|
|
access_token = csrf_exempt(AccessToken.as_view())
|
|
|
|
|
|
class BearerTokenUnauthorized(HttpResponse):
|
|
status_code = 401
|
|
|
|
def __init__(self, error=None):
|
|
super(BearerTokenUnauthorized, self).__init__('Unauthorized')
|
|
authenticate = 'Bearer'
|
|
if error:
|
|
authenticate += ' error="%s"' % error
|
|
self['WWW-Authenticate'] = authenticate
|
|
|
|
|
|
class OAuth2ProxyView(View):
|
|
def dispatch(self, request, *args, **kwargs):
|
|
# enforce Bearer authentication
|
|
authorization = request.META.get('HTTP_AUTHORIZATION')
|
|
if not authorization:
|
|
return BearerTokenUnauthorized()
|
|
try:
|
|
l = authorization.split()
|
|
assert len(l) == 2
|
|
assert l[0] == 'Bearer'
|
|
access_token = base64.b64decode(l[1])
|
|
state = cache.get('msp-access-token-' + access_token)
|
|
if not (set(self.scopes) <= set(state['scope'])):
|
|
return BearerTokenUnauthorized('insufficient_scope')
|
|
assert 'token' in state
|
|
self.oauth_session = OAuth2Session(app_settings.client_id,
|
|
token=state['token'])
|
|
return super(OAuth2ProxyView, self).dispatch(request, *args, **kwargs)
|
|
except (AssertionError, ValueError):
|
|
return BearerTokenUnauthorized('invalid_token')
|
|
|
|
def api_call(self, api_path, method='get', **kwargs):
|
|
url = urlparse.urljoin(app_settings.api_url, api_path)
|
|
return getattr(self.oauth_session, method)(url,
|
|
verify=app_settings.verify_certificate,
|
|
cert=app_settings.client_certificate, **kwargs)
|
|
|
|
|
|
class DocumentsView(OAuth2ProxyView):
|
|
scopes = [ 'LIST_DOCS' ]
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
result = self.api_call('app/rest/documents')
|
|
return HttpResponse(result, content_type='application/json')
|
|
|
|
|
|
documents = DocumentsView.as_view()
|
|
|
|
|
|
class DocumentView(OAuth2ProxyView):
|
|
scopes = [ 'GET_DOC' ]
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
result = self.api_call('app/rest/documents/{doc_id}'.format(doc_id=kwargs['doc_id']))
|
|
return HttpResponse(result, content_type='application/json')
|
|
|
|
|
|
document = DocumentView.as_view()
|
|
|
|
|
|
class LoginOrLinkView(PopupViewMixin, MspOAuthSessionViewMixin, View):
|
|
scopes = [ 'GET_AGC', 'DELETE_AGC' ]
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
'''Request an access grant code and associate it to the current user'''
|
|
result = self.api_call('app/rest/agc').json()
|
|
agc = result[ACCESS_GRANT_CODE]
|
|
user = authenticate(agc=agc)
|
|
if user:
|
|
auth_login(request, user)
|
|
return self.redirect(request)
|
|
elif request.user.is_authenticated():
|
|
msp_account, created = models.MspAccount.objects.get_or_create(defaults={
|
|
'agc': agc,
|
|
'token': json.dumps(self.token),
|
|
}, user=request.user)
|
|
if not created:
|
|
# Previous links is replaced
|
|
# XXX: we could add support for multi-link by changing the
|
|
# unicity constraint tuple from (user,) to (user, agc).
|
|
msp_account.agc = agc
|
|
msp_account.token = json.dumps(self.token)
|
|
msp_account.save()
|
|
messages.info(request, _('msp link created'))
|
|
return self.redirect(request)
|
|
else:
|
|
messages.info(request, _('to create a link with msp, please authenticate'))
|
|
return self.redirect_and_come_back(request, '{0}?nomsp=1'.format(settings.LOGIN_URL))
|
|
|
|
|
|
login_or_link = LoginOrLinkView.as_view()
|
|
|
|
class TemplateWithNextUrlView(TemplateView):
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(TemplateWithNextUrlView, self).get_context_data(**kwargs)
|
|
ctx['next'] = self.request.GET.get('next') or \
|
|
getattr(settings, 'LOGIN_REDIRECT_URL', '/')
|
|
return ctx
|
|
|
|
class LinkManagementView(TemplateWithNextUrlView):
|
|
template_name = 'msp/link_management.html'
|
|
|
|
link_management = mspaccount_required(LinkManagementView.as_view())
|
|
link_management = login_required(link_management)
|
|
|
|
class ConfirmUnlinkView(TemplateWithNextUrlView):
|
|
template_name = 'msp/confirm_unlink.html'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(ConfirmUnlinkView, self).get_context_data(**kwargs)
|
|
ctx['popup'] = True
|
|
return ctx
|
|
|
|
confirm_unlink = mspaccount_required(ConfirmUnlinkView.as_view())
|
|
confirm_unlink = login_required(confirm_unlink)
|
|
|
|
|
|
class UnlinkDoneView(TemplateWithNextUrlView):
|
|
template_name = 'msp/unlink_done.html'
|
|
|
|
unlink_done = UnlinkDoneView.as_view()
|
|
|
|
class MoreRedirectView(RedirectView):
|
|
def get_redirect_url(self, *args, **kwargs):
|
|
return app_settings.more_url
|
|
|
|
more_redirect = MoreRedirectView.as_view()
|
|
|