204 lines
7.3 KiB
Python
204 lines
7.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
# authentic2_cut - Authentic2 plugin for CUT
|
|
# Copyright (C) 2017 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import base64
|
|
import uuid
|
|
|
|
from django.core.files.base import ContentFile
|
|
from django.db.transaction import atomic
|
|
from django.conf import settings
|
|
from django.db import IntegrityError
|
|
from django.core.exceptions import ValidationError
|
|
|
|
from rest_framework.generics import ListAPIView
|
|
from rest_framework import serializers, pagination
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
from rest_framework import status
|
|
|
|
from django_filters.rest_framework import FilterSet
|
|
|
|
from authentic2_idp_oidc.utils import make_pairwise_reversible_sub
|
|
from authentic2 import api_views
|
|
|
|
from . import models, utils
|
|
|
|
|
|
class Base64ImageField(serializers.FileField):
|
|
def to_internal_value(self, data):
|
|
# "unicode/str Python3" data, means base64 encoded image - decode
|
|
if not hasattr(data, 'isdecimal'):
|
|
raise ValidationError('must be base64 string')
|
|
try:
|
|
content = base64.b64decode(data)
|
|
except ValueError:
|
|
raise ValidationError('invalid base64')
|
|
identifier = uuid.uuid4()
|
|
data = ContentFile(content, name=identifier.urn[9:] + '.jpg')
|
|
return super(Base64ImageField, self).to_internal_value(data)
|
|
|
|
|
|
class CUTValidateAttachmentSerializer(serializers.Serializer):
|
|
b64_content = Base64ImageField()
|
|
|
|
|
|
class CUTCreateValidateSerializer(serializers.Serializer):
|
|
justificatifs = serializers.ListField(
|
|
child=CUTValidateAttachmentSerializer(),
|
|
allow_empty=False)
|
|
external_id = serializers.CharField()
|
|
|
|
|
|
def get_origin(request):
|
|
origin = request.user
|
|
if hasattr(origin, 'oidc_client'):
|
|
origin = origin.oidc_client
|
|
return origin
|
|
|
|
|
|
class CUTValidateSerializer(serializers.ModelSerializer):
|
|
sub = serializers.SerializerMethodField()
|
|
|
|
def get_sub(self, instance):
|
|
request = self.context['request']
|
|
origin = get_origin(request)
|
|
if hasattr(origin, 'identifier_policy'):
|
|
oidc_client = origin
|
|
if oidc_client.identifier_policy == oidc_client.POLICY_PAIRWISE_REVERSIBLE:
|
|
return make_pairwise_reversible_sub(oidc_client, instance.user)
|
|
else:
|
|
return ''
|
|
return instance.user.uuid
|
|
|
|
class Meta:
|
|
model = models.ValidationRequest
|
|
fields = ('id', 'created', 'external_id', 'status', 'reason', 'validated', 'sub')
|
|
|
|
|
|
def _get_cut_validation_accepted_mime_types():
|
|
return getattr(settings, 'CUT_VALIDATION_ACCEPTED_MIME_TYPES', [
|
|
'image/jpeg',
|
|
'application/pdf',
|
|
'image/png',
|
|
])
|
|
|
|
|
|
@action(detail=True, methods=['get', 'post'], url_path='validate',
|
|
permission_classes=(api_views.DjangoPermission('custom_user.cut_validate_user'),))
|
|
def validate_cut(self, request, uuid):
|
|
user = self.get_object()
|
|
origin = get_origin(request)
|
|
|
|
if request.method == 'GET':
|
|
qs = models.ValidationRequest.objects.for_origin(origin).filter(user=user).select_related('user')
|
|
serializer = CUTValidateSerializer(qs, many=True, context={'request': request})
|
|
return Response({
|
|
'result': 1,
|
|
'next': None,
|
|
'previous': None,
|
|
'results': serializer.data,
|
|
})
|
|
serializer = CUTCreateValidateSerializer(data=request.data)
|
|
max_size = getattr(settings, 'CUT_VALIDATION_ATTACHMENT_MAX_SIZE', 300 * 1024 * 1024)
|
|
if serializer.is_valid():
|
|
errors = []
|
|
attachments = serializer.validated_data['justificatifs']
|
|
external_id = serializer.validated_data['external_id']
|
|
for i, attachment in enumerate(attachments):
|
|
content = attachment['b64_content']
|
|
if content.size > max_size:
|
|
errors.append({
|
|
'code': 'justificatifs-too-big',
|
|
'page': i,
|
|
'max-size': max_size,
|
|
})
|
|
accepted_mime_types = _get_cut_validation_accepted_mime_types()
|
|
mime_type = utils.mime_type_from_buffer(content.read(10000))
|
|
if mime_type not in accepted_mime_types:
|
|
errors.append({
|
|
'code': 'justificatifs-bad-format',
|
|
'page': i,
|
|
'accepted': accepted_mime_types,
|
|
})
|
|
content.name = content.name.rsplit('.', 1)[0] + '.' + mime_type.rsplit('/', 1)[1]
|
|
# rewind cursor
|
|
content.seek(0)
|
|
if models.ValidationRequest.objects.filter(
|
|
user=user, external_id=external_id).exists():
|
|
errors.append({
|
|
'code': 'already-exists',
|
|
})
|
|
if errors:
|
|
response = {'result': 0, 'errors': errors}
|
|
return Response(response, status.HTTP_400_BAD_REQUEST)
|
|
with atomic():
|
|
try:
|
|
validation_request = models.ValidationRequest.objects.create(
|
|
user=user, external_id=external_id, origin=origin)
|
|
except IntegrityError:
|
|
response = {'result': 0, 'errors': [{'code': 'already-exists'}]}
|
|
return Response(response, status.HTTP_400_BAD_REQUEST)
|
|
for attachment in attachments:
|
|
models.ValidationRequestAttachment.objects.create(
|
|
validation_request=validation_request,
|
|
image=attachment['b64_content'])
|
|
return Response({
|
|
'result': 1,
|
|
'status': 'received',
|
|
'sub': uuid,
|
|
'id': validation_request.id,
|
|
'external_id': external_id,
|
|
}, status=status.HTTP_201_CREATED)
|
|
else:
|
|
errors = []
|
|
for key in serializer.errors:
|
|
errors.append({'code': 'schema-error', 'field': key, 'sub-errors': serializer.errors[key]})
|
|
response = {'result': 0, 'errors': errors}
|
|
return Response(response, status.HTTP_400_BAD_REQUEST)
|
|
|
|
# attach new API to users' api
|
|
api_views.UsersAPI.validate_cut = validate_cut
|
|
|
|
|
|
class ValidateFilter(FilterSet):
|
|
class Meta:
|
|
model = models.ValidationRequest
|
|
fields = {
|
|
'validated': {
|
|
'gt',
|
|
}
|
|
}
|
|
|
|
|
|
class ValidateAPI(api_views.ExceptionHandlerMixin, ListAPIView):
|
|
methods = ['GET']
|
|
ordering_fields = ['id']
|
|
serializer_class = CUTValidateSerializer
|
|
pagination_class = pagination.CursorPagination
|
|
ordering = ['id']
|
|
filter_class = ValidateFilter
|
|
permission_classes = (api_views.DjangoPermission('custom_user.cut_validate_user'),)
|
|
|
|
def get_queryset(self):
|
|
qs = models.ValidationRequest.objects
|
|
origin = get_origin(self.request)
|
|
qs = qs.for_origin(origin).select_related('user')
|
|
return qs
|
|
|
|
|
|
validate = ValidateAPI.as_view()
|