authentic2-cut/src/authentic2_cut/api_views.py

203 lines
7.2 KiB
Python

# -*- coding: utf-8 -*-
# authentic2_cut - Authentic2 plugin for CUT
# Copyright (C) 2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import magic
import uuid
from django.core.files.base import ContentFile
from django.db.transaction import atomic
from django.conf import settings
from django.db import IntegrityError
from django.core.exceptions import ValidationError
from rest_framework.generics import ListAPIView
from rest_framework import serializers, pagination
from rest_framework.decorators import detail_route
from rest_framework.response import Response
from rest_framework import status
from django_filters.rest_framework import FilterSet
from authentic2_idp_oidc.utils import make_pairwise_reversible_sub
from authentic2 import api_views
from . import models
class Base64ImageField(serializers.FileField):
def to_internal_value(self, data):
# "unicode/str Python3" data, means base64 encoded image - decode
if not hasattr(data, 'isdecimal'):
raise ValidationError('must be base64 string')
try:
content = base64.b64decode(data)
except ValueError:
raise ValidationError('invalid base64')
identifier = uuid.uuid4()
data = ContentFile(content, name=identifier.urn[9:] + '.jpg')
return super(Base64ImageField, self).to_internal_value(data)
class CUTValidateAttachmentSerializer(serializers.Serializer):
b64_content = Base64ImageField()
class CUTCreateValidateSerializer(serializers.Serializer):
justificatifs = serializers.ListField(
child=CUTValidateAttachmentSerializer(),
allow_empty=False)
external_id = serializers.CharField()
def get_origin(request):
origin = request.user
if hasattr(origin, 'oidc_client'):
origin = origin.oidc_client
return origin
class CUTValidateSerializer(serializers.ModelSerializer):
sub = serializers.SerializerMethodField()
def get_sub(self, instance):
request = self.context['request']
origin = get_origin(request)
if hasattr(origin, 'identifier_policy'):
oidc_client = origin
if oidc_client.identifier_policy == oidc_client.POLICY_PAIRWISE_REVERSIBLE:
return make_pairwise_reversible_sub(oidc_client, instance.user)
else:
return ''
return instance.user.uuid
class Meta:
model = models.ValidationRequest
fields = ('id', 'created', 'external_id', 'status', 'reason', 'validated', 'sub')
def _get_cut_validation_accepted_mime_types():
return getattr(settings, 'CUT_VALIDATION_ACCEPTED_MIME_TYPES', [
'image/jpeg',
'application/pdf',
'image/png',
])
@detail_route(methods=['get', 'post'], url_path='validate',
permission_classes=(api_views.DjangoPermission('custom_user.cut_validate_user'),))
def validate_cut(self, request, uuid):
user = self.get_object()
origin = get_origin(request)
if request.method == 'GET':
qs = models.ValidationRequest.objects.for_origin(origin).filter(user=user).select_related('user')
serializer = CUTValidateSerializer(qs, many=True, context={'request': request})
return Response({
'result': 1,
'next': None,
'previous': None,
'results': serializer.data,
})
serializer = CUTCreateValidateSerializer(data=request.data)
max_size = getattr(settings, 'CUT_VALIDATION_ATTACHMENT_MAX_SIZE', 300 * 1024 * 1024)
if serializer.is_valid():
errors = []
attachments = serializer.validated_data['justificatifs']
external_id = serializer.validated_data['external_id']
for i, attachment in enumerate(attachments):
content = attachment['b64_content']
if content.size > max_size:
errors.append({
'code': 'justificatifs-too-big',
'page': i,
'max-size': max_size,
})
accepted_mime_types = _get_cut_validation_accepted_mime_types()
if magic.from_buffer(content.read(10000), mime=True) not in accepted_mime_types:
errors.append({
'code': 'justificatifs-bad-format',
'page': i,
'accepted': accepted_mime_types,
})
# rewind cursor
content.seek(0)
if models.ValidationRequest.objects.filter(
user=user, external_id=external_id).exists():
errors.append({
'code': 'already-exists',
})
if errors:
response = {'result': 0, 'errors': errors}
return Response(response, status.HTTP_400_BAD_REQUEST)
with atomic():
try:
validation_request = models.ValidationRequest.objects.create(
user=user, external_id=external_id, origin=origin)
except IntegrityError as e:
response = {'result': 0, 'errors': [{'code': 'already-exists'}]}
return Response(response, status.HTTP_400_BAD_REQUEST)
for attachment in attachments:
models.ValidationRequestAttachment.objects.create(
validation_request=validation_request,
image=attachment['b64_content'])
return Response({
'result': 1,
'status': 'received',
'sub': uuid,
'id': validation_request.id,
'external_id': external_id,
}, status=status.HTTP_201_CREATED)
else:
errors = []
for key in serializer.errors:
errors.append({'code': 'schema-error', 'field': key, 'sub-errors': serializer.errors[key]})
response = {'result': 0, 'errors': errors}
return Response(response, status.HTTP_400_BAD_REQUEST)
# attach new API to users' api
api_views.UsersAPI.validate_cut = validate_cut
class ValidateFilter(FilterSet):
class Meta:
model = models.ValidationRequest
fields = {
'validated': {
'gt',
}
}
class ValidateAPI(api_views.ExceptionHandlerMixin, ListAPIView):
methods = ['GET']
ordering_fields = ['id']
serializer_class = CUTValidateSerializer
pagination_class = pagination.CursorPagination
ordering = ['id']
filter_class = ValidateFilter
permission_classes = (api_views.DjangoPermission('custom_user.cut_validate_user'),)
def get_queryset(self):
qs = models.ValidationRequest.objects
origin = get_origin(self.request)
qs = qs.for_origin(origin).select_related('user')
return qs
validate = ValidateAPI.as_view()