oauth2: improve coding style (#22717)

* declare all views with .as_view()
* add helper make_url() to properly add parameters to query string of
  redirect_uri
* stop threading the redirect_uri through the session
* make POST form target implicit, so that parameters are kept
* do checks in dispatch() to share them between POST and GET methods
This commit is contained in:
Benjamin Dauvergne 2018-03-21 21:44:32 +01:00
parent d5608c7478
commit 2fe7382ab3
4 changed files with 86 additions and 90 deletions

View File

@ -15,16 +15,14 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.conf.urls import url
from django.contrib.auth.decorators import login_required
from .views import (OAuth2AuthorizeView, get_document_token, get_document,
OAuth2AuthorizePutView, put_document)
from .views import (authorize_get_document, get_document_token, get_document,
authorize_put_document, put_document)
urlpatterns = [
url(r'get-document/authorize', login_required(OAuth2AuthorizeView.as_view()), name='oauth2-authorize'),
url(r'get-document/authorize', authorize_get_document, name='oauth2-authorize'),
url(r'get-document/token', get_document_token, name='oauth2-get-token'),
url(r'get-document/', get_document, name='oauth2-get-document'),
url(r'put-document/$', put_document, name='oauth2-put-document'),
url(r'put-document/(?P<pk>\w+)/authorize', login_required(OAuth2AuthorizePutView.as_view()),
name='oauth2-put-document-authorize')
url(r'put-document/(?P<pk>\w+)/authorize', authorize_put_document, name='oauth2-put-document-authorize')
]

View File

@ -14,15 +14,18 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urlparse
from urllib import quote
from django.utils.translation import ugettext as _
from django.utils.http import urlencode
from django.core.files.base import ContentFile
from django.core.urlresolvers import reverse
from django.http import (HttpResponse, HttpResponseBadRequest,
HttpResponseRedirect)
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import FormView, TemplateView
from django.contrib.auth.decorators import login_required
from rest_framework.response import Response
from rest_framework.views import APIView
@ -35,6 +38,16 @@ from .utils import authenticate_bearer, get_content_disposition_value
from fargo.fargo.models import UserDocument, Document
def make_url(url, **kwargs):
parsed = urlparse.urlparse(url)
query = urlparse.parse_qs(parsed.query)
for key, value in kwargs.iteritems():
if value is not None:
query[key] = value
parsed = parsed[:4] + (urlencode(query),) + parsed[5:]
return urlparse.urlunparse(parsed)
class OAuth2Exception(Exception):
pass
@ -53,76 +66,59 @@ class OAuth2AuthorizeView(FormView):
form_class = OAuth2AuthorizeForm
success_url = '/'
def get(self, request, *args, **kwargs):
redirect_uri = request.GET.get('redirect_uri')
if not redirect_uri:
return HttpResponseBadRequest('missing parameter redirect_uri')
def redirect(self, **kwargs):
'''Return to requester'''
return HttpResponseRedirect(make_url(self.redirect_uri, **kwargs))
def dispatch(self, request):
self.redirect_uri = request.GET.get('redirect_uri')
if not self.redirect_uri:
return HttpResponseBadRequest('missing redirect_uri parameter')
client_id = request.GET.get('client_id')
response_type = request.GET.get('response_type')
if not client_id or not response_type:
uri = self.error_redirect(redirect_uri, 'invalid_request')
return HttpResponseRedirect(uri)
return self.redirect(error='invalid_request')
if response_type != 'code':
uri = self.error_redirect(redirect_uri, 'unsupported_response_type')
return HttpResponseRedirect(uri)
return self.redirect(error='unsupported_response_type')
try:
client = OAuth2Client.objects.get(client_id=client_id)
if not client.check_redirect_uri(redirect_uri):
uri = self.error_redirect(redirect_uri, 'invalid_redirect_uri')
return HttpResponseRedirect(uri)
if not client.check_redirect_uri(self.redirect_uri):
return self.redirect(error='invalid_redirect_uri')
except OAuth2Client.DoesNotExist:
uri = self.error_redirect(redirect_uri, 'unauthorized_client')
return HttpResponseRedirect(uri)
return self.redirect(error='unauthorized_client')
self.state = request.GET.get('state', None)
return super(OAuth2AuthorizeView, self).dispatch(request)
request.session['redirect_uri'] = redirect_uri
return super(OAuth2AuthorizeView, self).get(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
def post(self, request):
if 'cancel' in request.POST:
uri = self.error_redirect(request.session['redirect_uri'], 'access_denied')
return HttpResponseRedirect(uri)
else:
return super(OAuth2AuthorizeView, self).post(request, *args, **kwargs)
return self.redirect(error='access_denied')
return super(OAuth2AuthorizeView, self).post(request)
def get_form_kwargs(self):
kwargs = super(OAuth2AuthorizeView, self).get_form_kwargs()
kwargs['user'] = self.request.user
return kwargs
def error_redirect(self, redirect_uri, error_code):
if not redirect_uri[-1] == '/':
redirect_uri += '/'
redirect_uri += '?'
return redirect_uri + urllib.urlencode({'error': error_code})
def form_valid(self, form):
doc = form.cleaned_data['document']
authorization = OAuth2Authorize.objects.create(user_document=doc)
getvars = {'code': authorization.code}
document = form.cleaned_data['document']
authorization = OAuth2Authorize.objects.create(user_document=document)
return self.redirect(code=authorization.code, state=self.state)
if 'state' in self.request.GET:
getvars['state'] = self.request.GET['state']
redirect_uri = self.request.session['redirect_uri']
if not redirect_uri[-1] == '/':
redirect_uri += '/'
self.success_url = redirect_uri + '?' + urllib.urlencode(getvars)
return super(OAuth2AuthorizeView, self).form_valid(form)
authorize_get_document = login_required(OAuth2AuthorizeView.as_view())
class GetDocumentTokenView(OAUTH2APIViewMixin):
def post(self, request, *args, **kwargs):
client = request.user.oauth2_client
data = request.data
if not client.check_redirect_uri(data['redirect_uri']):
def post(self, request):
if not request.user.oauth2_client.check_redirect_uri(request.data['redirect_uri']):
return Response({'error': 'invalid_request'}, status=400)
if data['grant_type'] != 'authorization_code':
if request.data['grant_type'] != 'authorization_code':
return Response({'error': 'unsupported_grant_type'}, status=400)
try:
token = OAuth2Authorize.objects.get(code=data['code']).access_token
token = OAuth2Authorize.objects.get(code=request.data['code']).access_token
except OAuth2Authorize.DoesNotExist:
return Response({'error': 'invalid_request'}, status=400)
@ -159,8 +155,7 @@ class PutDocumentAPIView(OAUTH2APIViewMixin):
document = Document.objects.get_by_file(f)
oauth2_document = OAuth2TempFile.objects.create(document=document,
filename=filename)
uri = reverse('oauth2-put-document-authorize',
args=[oauth2_document.pk]) + '/'
uri = reverse('oauth2-put-document-authorize', args=[oauth2_document.pk]) + '/'
response = Response()
response['Location'] = uri
@ -173,46 +168,49 @@ put_document = PutDocumentAPIView.as_view()
class OAuth2AuthorizePutView(TemplateView):
template_name = 'oauth2/confirm.html'
def get_context_data(self, **kwargs):
context = super(OAuth2AuthorizePutView, self).get_context_data(**kwargs)
try:
oauth2_document = OAuth2TempFile.objects.get(pk=kwargs['pk'])
def redirect(self, **kwargs):
'''Return to requester'''
return HttpResponseRedirect(make_url(self.redirect_uri, **kwargs))
except OAuth2TempFile.DoesNotExist:
context['error_message'] = _('The document has not been uploaded')
context['redirect_uri'] = self.request.GET['redirect_uri']
return context
user_document = UserDocument.objects.filter(user=self.request.user,
document=oauth2_document.document)
if user_document:
context['error_message'] = _('This document is already in your portfolio')
context['redirect_uri'] = self.request.GET['redirect_uri']
return context
else:
context['filename'] = oauth2_document.filename
context['error_message'] = ''
return context
def get(self, request, *args, **kwargs):
redirect_uri = request.GET.get('redirect_uri', '')
if not redirect_uri:
def dispatch(self, request, *args, **kwargs):
self.redirect_uri = request.GET.get('redirect_uri', '')
if not self.redirect_uri:
return HttpResponseBadRequest('missing redirect_uri parameter')
self.oauth2_document = OAuth2TempFile.objects.filter(pk=kwargs['pk']).first()
return super(OAuth2AuthorizePutView, self).dispatch(request)
request.session['redirect_uri'] = redirect_uri
return super(OAuth2AuthorizePutView, self).get(request, *args, **kwargs)
def get_context_data(self, **kwargs):
if self.oauth2_document:
# verify if document already exists
if not UserDocument.objects.filter(
user=self.request.user,
document=self.oauth2_document.document).exists():
kwargs['filename'] = self.oauth2_document.filename
kwargs['error_message'] = ''
else:
kwargs['error_message'] = _('This document is already in your portfolio')
kwargs['redirect_uri'] = self.request.GET['redirect_uri']
else:
kwargs['error_message'] = _('The document has not been uploaded')
kwargs['redirect_uri'] = self.request.GET['redirect_uri']
return super(OAuth2AuthorizePutView, self).get_context_data(**kwargs)
def post(self, request):
if not self.oauth2_document:
return self.get(request)
def post(self, request, *args, **kwargs):
oauth2_document = OAuth2TempFile.objects.get(pk=kwargs['pk'])
try:
if 'cancel' in request.POST:
error = urllib.urlencode({'error': 'access_denied'})
uri = request.session['redirect_uri'] + '?' + error
return HttpResponseRedirect(uri)
return self.redirect(error='access_denied')
UserDocument.objects.create(
user=request.user, document=oauth2_document.document,
filename=oauth2_document.filename)
return HttpResponseRedirect(request.session['redirect_uri'])
user=request.user,
document=self.oauth2_document.document,
filename=self.oauth2_document.filename)
return self.redirect()
finally:
oauth2_document.delete()
self.oauth2_document.delete()
authorize_put_document = login_required(OAuth2AuthorizePutView.as_view())

View File

@ -10,8 +10,8 @@
<p>{% blocktrans %}
Do you accept to add<b> {{ filename }} </b>to your portfolio?
{% endblocktrans %}</p>
<form id="send-file" method="post" enctype="multipart/form-data" action="{% url 'oauth2-put-document-authorize' pk %}">
{% csrf_token %}
<form id="send-file" method="post" enctype="multipart/form-data">
{% csrf_token %}
<input type="submit" name="submit" value="{% trans "Allow" %}"/>
<input type="submit" name="cancel" value="{% trans "Cancel" %}"/>
</form>

View File

@ -56,7 +56,7 @@ def test_get_document_oauth2(app, john_doe, oauth2_client, user_doc):
}
# test missing redirect_uri
resp = app.get(url, params={}, status=400)
assert resp.content == 'missing parameter redirect_uri'
assert resp.content == 'missing redirect_uri parameter'
# test missing client id
params['redirect_uri'] = 'https://toto.example.com'
resp = app.get(url, params=params, status=302)