This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
pii-manager-poc/manager/core/views.py

187 lines
7.0 KiB
Python

# pii manager - proof-of-concept implementation
# Copyright (C) 2021 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from django.core.exceptions import MultipleObjectsReturned
from django.http import HttpResponseBadRequest, HttpResponseRedirect
from django.shortcuts import render
from django.utils.encoding import force_text
from django.views.generic import FormView, TemplateView, View
from jwcrypto.common import JWException
from jwcrypto.jwk import JWK
from jwcrypto.jwt import JWT
from rest_framework.response import Response
from rest_framework.views import APIView
from manager.core.models import AuthorizationCode, GenericProvider, GenericSource
class ResourceView(APIView):
def get(self, request, *args, **kwargs):
resource = {}
# 1. retrieve access token
token = request.GET.get('token', '')
header, payload, sig, *_ = token.split('.')
# 2. local oken verification
try:
token = JWT() # todo
token.verify(sig)
except JWException as e:
return Response(request, {'err': 'invalid_token'}, status=403)
try:
source = GenericProvider.objects.get(url=JWT.fields.provider_uri)
except GenericProvider.DoesNotExist as e:
return Response(request, {'err': 'unknown_provider'}, status=403)
except MultipleObjectsReturned as e:
return Response(request, {'err': 'invalid_provider'}, status=403)
resource_id = request.GET.get('resource_id')
user_id = request.GET.get('pii_principal_id')
if not resource_id:
return Response(request, {'err': 'unknown_resource'}, status=404)
if not user_id:
return Response(request, {'err': 'invalid_pii_principal_id'}, status=403)
consent_receipts = ConsentReceipt.objects.filter(pii_identifier=resource_id, pii_principal_id=user_id)
requested_scopes = request.GET.get('scopes', '').split()
if not consent_receipts:
# authorization required, user interaction needed
# xxx not a redirect, fail and mention authorization url
return Response(
request,
{'err': 'user_authorization_required'},
url=authorization_url,
scopes=requested_scopes,
status=403,
)
# if not request.GET.get('user_id') -> error
previously_granted_scopes = []
for receipt in consent_receipts:
previously_granted_scopes.append(consent_receipts.scopes_set.split())
# 3. online verification?
if is_online_verifiable(service):
is_verified = perform_online_verification(token, service)
if not is_verified:
return Response(
request,
{'err': 'invalid_token'}, # xxx mention that online verification has been performed
status=403,
)
if diff_scopes := set(requested_scopes) - set(previously_granted_scopes):
# todo broken here
# uma logic and scope reduction algo
# 3.a. Scope translation
# We need to define a new mapping object, preferably model
# 3.b. Set inclusion in service's authorized scopes in stored CRs
# Check if compatible with our current CR model
# 3.c. Perform No/maybe decision
if not perform_uma_decision(service):
return redirect(request, authorization_url, scopes=diff_scopes)
else:
requested_scopes = m
# 4. reverse scope reduction
# 4.a. retrieve source(s) hosting pii
# 4.b. call each target source's backend
# 4.c. return available pii to the caller
return Response(
{
'err': 0,
'data': resources,
}
)
resource = ResourceView.as_view()
def AuthorizeView(TemplateView):
template = 'manager/core/authorize.html'
success_url = '/'
def redirect(self, **kwargs):
return HttpResponseRedirect(make_url(self.redirect_uri, **kwargs))
def dispatch(self, request):
# mandatory request parameters
self.redirect_uri = request.GET.get('redirect_uri')
if not self.redirect_uri:
return HttpResponseBadRequest('missing redirect_uri parameter')
client_id = request.GET.get('client_id')
if not client_id:
return HttpResponseBadRequest('missing client_id parameter')
response_type = request.GET.get('response_type')
if not response_type:
return HttpResponseBadRequest('missing response_type parameter')
try:
self.service = Service.objects.get(client_id=client_id)
# todo check redirect uri against client's uris
except Service.DoesNotExist as e:
return self.redirect(error='unauthorized_client')
self.state = request.GET.get('state')
return super().dispatch(request)
def form_valid(self, request):
scopes = request.POST.get('scopes')
# xxx required claims from scopes?
# 1. create consent receipt
# 2. generate authorization code
return HttpResponseRedirect(self.redirect_uri, code=authorization_code, state=self.state)
def post(self, request):
if 'cancel' in request.POST:
return self.redirect(error='access_denied')
return super().post(request)
# todo get_form_kwargs, user details?
def get_context_data(self, **kwargs):
ctx = super().get_context_data(**kwargs)
ctx['service'] = self.service
# todo display scopes?
return ctx
authorize = ResourceView.as_view()
class TokenView(APIView):
def post(self, request, *args, **kwarsg):
if request.data['grant_type'] != 'authorization_code': # todo uma grant
return Response({'error': 'unsupported_grant_type'}, status=400)
# validate authorization code
try:
authorization_code = AuthorizationCode.objects.get(value=request.data['code'])
except AuthorizationCode.DoesNotExist:
return Response({'error': 'invalid_grant'}, status=400)
# todo check token lifetime
# issue access token
token = authorization_code.generate_token()
return Response({'token': force_text(token)}) # todo 'expires' param
token = TokenView.as_view()