fargo/fargo/fargo/api_views.py

174 lines
6.2 KiB
Python

# fargo - document box
# Copyright (C) 2015-2016 Entr'ouvert
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
from django.conf import settings
from django.contrib.auth.models import User
from django.utils.text import slugify
from rest_framework import exceptions, filters, serializers, status
from rest_framework.generics import GenericAPIView, ListAPIView
from rest_framework.permissions import IsAdminUser
from rest_framework.response import Response
from . import api_errors, api_fields, utils
from .models import Document, Origin, UserDocument
try:
from mellon.models import UserSAMLIdentifier
except ImportError:
UserSAMLIdentifier = None
class UserSerializerMixin(serializers.Serializer):
user_email = serializers.CharField(write_only=True, required=False)
user_nameid = serializers.CharField(write_only=True, required=False)
def validate(self, data):
if not (data.get('user_email') or data.get('user_nameid')):
raise api_errors.APIError('MISSING_USER')
if data.get('user_email'):
user = User.objects.get(email=data.get('user_email'))
elif data.get('user_nameid'):
user = UserSAMLIdentifier.objects.get(name_id=data.get('user_nameid')).user
data.pop('user_email', None)
data.pop('user_nameid', None)
data['user'] = user
return data
def validate_user_email(self, value):
try:
User.objects.get(email=value)
except User.MultipleObjectsReturned:
raise api_errors.APIError('TOO_MANY_USERS')
except User.DoesNotExist:
raise api_errors.APIError('UNKNOWN_USER')
return value
def validate_user_nameid(self, value):
if value:
assert UserSAMLIdentifier, 'nameid lookups require django-mellon'
try:
UserSAMLIdentifier.objects.get(name_id=value)
except UserSAMLIdentifier.DoesNotExist:
raise api_errors.APIError('UNKNOWN_USER')
return value
class CommonAPIMixin:
def handle_exception(self, exc):
if isinstance(exc, exceptions.APIException):
exc.detail = {'result': 0, 'errors': exc.detail}
return super().handle_exception(exc)
def finalize_response(self, request, response, *args, **kwargs):
if not isinstance(response.data, dict) or 'result' not in response.data:
response.data = {'result': 1, 'data': response.data}
return super().finalize_response(request, response, *args, **kwargs)
def get_max_size():
return settings.FARGO_MAX_DOCUMENT_SIZE
class PushDocumentSerializer(UserSerializerMixin):
origin = serializers.CharField(required=True)
file_b64_content = api_fields.Base64FileField(required=True, max_size=get_max_size)
file_name = serializers.CharField(required=False)
deletable_by_user = serializers.BooleanField(required=False, default=True)
def validate(self, data):
data = super().validate(data)
user = data['user']
if (
Document.objects.used_space(user) + data['file_b64_content'].size
> settings.FARGO_MAX_DOCUMENT_BOX_SIZE
):
raise api_errors.APIError('BOX_IS_FULL', limit=str(settings.FARGO_MAX_DOCUMENT_BOX_SIZE))
return data
class PushDocument(CommonAPIMixin, GenericAPIView):
serializer_class = PushDocumentSerializer
permission_classes = (IsAdminUser,)
def post(self, request, format=None):
serializer = self.get_serializer(data=request.data)
if not serializer.is_valid():
raise serializers.ValidationError(serializer.errors)
data = serializer.validated_data
if 'file_name' in data:
data['file_name'] = data['file_name'].replace('/', '-')
origin, created = Origin.objects.get_or_create(
slug=slugify(data.get('origin')), defaults={'label': data.get('origin')}
)
document_file = data['file_b64_content']
if data.get('file_name'):
document_file.name = data.get('file_name')
content_hash = utils.sha256_of_file(document_file)
document, created = Document.objects.get_or_create(
content_hash=content_hash, defaults={'content': document_file}
)
user_document, created = UserDocument.objects.get_or_create(
user=data.get('user'),
filename=data.get('file_name'),
document=document,
origin=origin,
deletable_by_user=data.get('deletable_by_user'),
)
if not created:
raise api_errors.APIError('DOCUMENT_EXISTS')
user_document.save()
response_status = status.HTTP_200_OK
return Response(None, response_status)
push_document = PushDocument.as_view()
class UserDocumentSerializer(serializers.Serializer):
def to_representation(self, obj):
return {
'label': obj.filename,
'url': self.context['request'].build_absolute_uri(obj.get_download_url()),
}
class RecentDocuments(ListAPIView):
serializer_class = UserDocumentSerializer
def get_queryset(self):
return UserDocument.objects.filter(
user=self.request.user, created__gt=datetime.datetime.now() - datetime.timedelta(days=14)
).order_by('-created')[:10]
recent_documents = RecentDocuments.as_view()
class FilterByUser(filters.BaseFilterBackend):
def filter_queryset(self, request, queryset, view):
if 'user_email' in request.GET:
return queryset.filter(user__email=request.GET['user_email'])
elif 'user_nameid' in request.GET:
return queryset.filter(user__saml_identifiers__name_id=request.GET['user_nameid'])
return queryset