This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
authentic2-idp-oauth2/authentic2_idp_oauth2/views.py

129 lines
4.7 KiB
Python

import logging
import uuid
import requests
from django.template import RequestContext
from rest_framework.decorators import (api_view, authentication_classes)
from rest_framework.authentication import (OAuth2Authentication,
SessionAuthentication)
from rest_framework.response import Response
from provider.oauth2.views import Authorize
from provider import scope
from authentic2.models import FederatedId
from authentic2.attributes_ng.engine import get_attributes
from . import forms, app_settings, models, signature
__ALL_ = [ 'user_info', 'Authorize' ]
logger = logging.getLogger(__name__)
def add_targeted_id(request, data):
'''Retrieve a targeted id for the user and this client, if none exist
create one using a random UUID.
'''
if request.auth is not None:
if hasattr(request.auth, 'client'):
client = request.auth.client
user = request.user
fedid, created = FederatedId.objects.get_or_create_for_local_user_and_service(
user, client,
'urn:oasis:names:tc:SAML:2.0:nameid-format:persistent',
'urn:uuid:%s' % uuid.uuid4())
data['targeted_id'] = fedid.id_value
return data
def delete_targeted_id(request, data):
'''Delete the targeted id'''
if request.auth is not None:
if hasattr(request.auth, 'client'):
client = request.auth.client
user = request.user
qs = FederatedId.objects.for_local_user_and_service(
user, client)
qs.delete()
@api_view(['GET', 'DELETE'])
@authentication_classes([OAuth2Authentication, SessionAuthentication])
def user_info(request):
'''User info endpoint'''
data = {}
if request.user and request.user.is_authenticated():
if request.method == 'GET':
data = {
}
if request.auth is None:
ctx = get_attributes({
'request': request,
'user': request.user,
})
for key, value in ctx.iteritems():
if isinstance(value, (str, unicode)):
data[key] = value
elif isinstance(value, (list, tuple)) and value and isinstance(value[0], (str, unicode)):
data[key] = value
else:
client = request.auth.client
qs = models.AttributeRelease.objects.filter(client=client)
wanted_attributes = [a.attribute_name for a in qs]
ctx = get_attributes({
'request': request,
'user': request.user,
'provider': client,
'__wanted_attributes': wanted_attributes,
})
for a in qs:
if a.attribute_name in ctx:
data[a.name] = ctx[a.attribute_name]
add_targeted_id(request, data)
elif request.method == 'DELETE':
delete_targeted_id(request, data)
return Response(data)
class Authorize(Authorize):
'''Overload the default Authorize view of django-oauth2-provider to permit
automatic grant for some scopes and some clients
'''
def get_authorization_form(self, request, client, data, client_data):
automatic_grant = app_settings.AUTOMATIC_GRANT
if hasattr(client, 'a2client'):
client_scopes = client.a2client.authorized_scopes
client_scopes = filter(None, map(unicode.strip, client_scopes.strip().split(' ')))
automatic_grant += ((client.url, client_scopes),)
for url_prefix, scopes in automatic_grant:
if client_data['redirect_uri'].startswith(url_prefix) and \
scope.check(client_data['scope'], scope.to_int(*scopes)):
return forms.EmptyForm({}, scope=client_data['scope'])
return super(Authorize, self).get_authorization_form(
request, client, data, client_data)
@api_view(['GET', 'POST', 'PUT', 'DELETE'])
@authentication_classes([OAuth2Authentication, SessionAuthentication])
def ws_proxy(request, ws_id):
try:
ws = models.WebService.objects.get(id=ws_id)
except models.WebService.DoesNotExist:
ws = models.WebService.objects.get(slug=ws_id)
ctx = RequestContext(request)
url = ws.get_url(ctx)
logger.debug('proxy to URL %r', url)
method = request.method.lower()
if ws.signature_key and ws.auth_mech.startswith('hmac-'):
url = signature.sign_url(url, str(ws.signature_key),
algo=ws.auth_mech[5:])
response = getattr(requests, method)(url,
verify=ws.verify_certificate,
allow_redirects=ws.allow_redirects,
timeout=ws.timeout)
return Response(response.json())