authentic2-cut/src/authentic2_cut/api_views.py

217 lines
7.6 KiB
Python

# authentic2_cut - Authentic2 plugin for CUT
# Copyright (C) 2017 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import uuid
from authentic2 import api_views
from authentic2_idp_oidc.utils import make_pairwise_reversible_sub
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.files.base import ContentFile
from django.db import IntegrityError
from django.db.transaction import atomic
from django_filters.rest_framework import FilterSet
from rest_framework import pagination, serializers, status
from rest_framework.decorators import action
from rest_framework.generics import ListAPIView
from rest_framework.response import Response
from . import models, utils
class Base64ImageField(serializers.FileField):
def to_internal_value(self, data):
# "unicode/str Python3" data, means base64 encoded image - decode
if not hasattr(data, 'isdecimal'):
raise ValidationError('must be base64 string')
try:
content = base64.b64decode(data)
except ValueError:
raise ValidationError('invalid base64')
identifier = uuid.uuid4()
data = ContentFile(content, name=identifier.urn[9:] + '.jpg')
return super().to_internal_value(data)
class CUTValidateAttachmentSerializer(serializers.Serializer):
b64_content = Base64ImageField()
class CUTCreateValidateSerializer(serializers.Serializer):
justificatifs = serializers.ListField(child=CUTValidateAttachmentSerializer(), allow_empty=False)
external_id = serializers.CharField()
def get_origin(request):
origin = request.user
if hasattr(origin, 'oidc_client'):
origin = origin.oidc_client
return origin
class CUTValidateSerializer(serializers.ModelSerializer):
sub = serializers.SerializerMethodField()
def get_sub(self, instance):
request = self.context['request']
origin = get_origin(request)
if hasattr(origin, 'identifier_policy'):
oidc_client = origin
if oidc_client.identifier_policy == oidc_client.POLICY_PAIRWISE_REVERSIBLE:
return make_pairwise_reversible_sub(oidc_client, instance.user)
else:
return ''
return instance.user.uuid
class Meta:
model = models.ValidationRequest
fields = ('id', 'created', 'external_id', 'status', 'reason', 'validated', 'sub')
def _get_cut_validation_accepted_mime_types():
return getattr(
settings,
'CUT_VALIDATION_ACCEPTED_MIME_TYPES',
[
'image/jpeg',
'application/pdf',
'image/png',
],
)
@action(
detail=True,
methods=['get', 'post'],
url_path='validate',
permission_classes=(api_views.DjangoPermission('custom_user.cut_validate_user'),),
)
def validate_cut(self, request, uuid):
user = self.get_object()
origin = get_origin(request)
if request.method == 'GET':
qs = models.ValidationRequest.objects.for_origin(origin).filter(user=user).select_related('user')
serializer = CUTValidateSerializer(qs, many=True, context={'request': request})
return Response(
{
'result': 1,
'next': None,
'previous': None,
'results': serializer.data,
}
)
serializer = CUTCreateValidateSerializer(data=request.data)
max_size = getattr(settings, 'CUT_VALIDATION_ATTACHMENT_MAX_SIZE', 300 * 1024 * 1024)
if serializer.is_valid():
errors = []
attachments = serializer.validated_data['justificatifs']
external_id = serializer.validated_data['external_id']
for i, attachment in enumerate(attachments):
content = attachment['b64_content']
if content.size > max_size:
errors.append(
{
'code': 'justificatifs-too-big',
'page': i,
'max-size': max_size,
}
)
accepted_mime_types = _get_cut_validation_accepted_mime_types()
mime_type = utils.mime_type_from_buffer(content.read(10000))
if mime_type not in accepted_mime_types:
errors.append(
{
'code': 'justificatifs-bad-format',
'page': i,
'accepted': accepted_mime_types,
}
)
content.name = content.name.rsplit('.', 1)[0] + '.' + mime_type.rsplit('/', 1)[1]
# rewind cursor
content.seek(0)
if models.ValidationRequest.objects.filter(user=user, external_id=external_id).exists():
errors.append(
{
'code': 'already-exists',
}
)
if errors:
response = {'result': 0, 'errors': errors}
return Response(response, status.HTTP_400_BAD_REQUEST)
with atomic():
try:
validation_request = models.ValidationRequest.objects.create(
user=user, external_id=external_id, origin=origin
)
except IntegrityError:
response = {'result': 0, 'errors': [{'code': 'already-exists'}]}
return Response(response, status.HTTP_400_BAD_REQUEST)
for attachment in attachments:
models.ValidationRequestAttachment.objects.create(
validation_request=validation_request, image=attachment['b64_content']
)
return Response(
{
'result': 1,
'status': 'received',
'sub': uuid,
'id': validation_request.id,
'external_id': external_id,
},
status=status.HTTP_201_CREATED,
)
else:
errors = []
for key in serializer.errors:
errors.append({'code': 'schema-error', 'field': key, 'sub-errors': serializer.errors[key]})
response = {'result': 0, 'errors': errors}
return Response(response, status.HTTP_400_BAD_REQUEST)
# attach new API to users' api
api_views.UsersAPI.validate_cut = validate_cut
class ValidateFilter(FilterSet):
class Meta:
model = models.ValidationRequest
fields = {
'validated': {
'gt',
}
}
class ValidateAPI(api_views.ExceptionHandlerMixin, ListAPIView):
methods = ['GET']
ordering_fields = ['id']
serializer_class = CUTValidateSerializer
pagination_class = pagination.CursorPagination
ordering = ['id']
filter_class = ValidateFilter
permission_classes = (api_views.DjangoPermission('custom_user.cut_validate_user'),)
def get_queryset(self):
qs = models.ValidationRequest.objects
origin = get_origin(self.request)
qs = qs.for_origin(origin).select_related('user')
return qs
validate = ValidateAPI.as_view()