This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.

105 lines
3.7 KiB

import logging
import uuid
import requests
from django.template import RequestContext
from rest_framework.decorators import (api_view, authentication_classes)
from rest_framework.authentication import (OAuth2Authentication,
from rest_framework.response import Response
from provider.oauth2.views import Authorize
from provider import scope
from authentic2.models import FederatedId
from . import forms, app_settings, models, signature
__ALL_ = [ 'user_info', 'Authorize' ]
logger = logging.getLogger(__name__)
def add_targeted_id(request, data):
'''Retrieve a targeted id for the user and this client, if none exist
create one using a random UUID.
if request.auth is not None:
if hasattr(request.auth, 'client'):
client = request.auth.client
user = request.user
fedid, created = FederatedId.objects.get_or_create_for_local_user_and_service(
user, client,
'urn:uuid:%s' % uuid.uuid4())
data['targeted_id'] = fedid.id_value
return data
def delete_targeted_id(request, data):
'''Delete the targeted id'''
if request.auth is not None:
if hasattr(request.auth, 'client'):
client = request.auth.client
user = request.user
qs = FederatedId.objects.for_local_user_and_service(
user, client)
@api_view(['GET', 'DELETE'])
@authentication_classes([OAuth2Authentication, SessionAuthentication])
def user_info(request):
'''User info endpoint'''
data = {}
if request.user and request.user.is_authenticated():
if request.method == 'GET':
user = request.user
data = {
'username': user.username,
'first_name': user.first_name,
'last_name': user.last_name,
'display_name': user.get_full_name(),
'role': user.groups.values_list('name', flat=True),
add_targeted_id(request, data)
elif request.method == 'DELETE':
delete_targeted_id(request, data)
return Response(data)
class Authorize(Authorize):
'''Overload the default Authorize view of django-oauth2-provider to permit
automatic grant for some scopes and some clients
def get_authorization_form(self, request, client, data, client_data):
for url_prefix, scopes in app_settings.AUTOMATIC_GRANT:
if client.url.startswith(url_prefix) and \
scope.check(client_data['scope'], scope.to_int(*scopes)):
# return an always valid form
return forms.EmptyForm({}, scope=client_data['scope'])
return super(Authorize, self).get_authorization_form(
request, client, data, client_data)
@api_view(['GET', 'POST', 'PUT', 'DELETE'])
@authentication_classes([OAuth2Authentication, SessionAuthentication])
def ws_proxy(request, ws_id):
ws = models.WebService.objects.get(id=ws_id)
except models.WebService.DoesNotExist:
ws = models.WebService.objects.get(slug=ws_id)
ctx = RequestContext(request)
url = ws.get_url(ctx)
logger.debug('proxy to URL %r', url)
method = request.method.lower()
if ws.signature_key and ws.auth_mech.startswith('hmac-'):
url = signature.sign_url(url, ws.signature_key,
response = getattr(requests, method)(url,
return Response(response.json())