241 lines
9.2 KiB
Python
241 lines
9.2 KiB
Python
# fargo - document box
|
|
# Copyright (C) 2015-2016 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import datetime
|
|
|
|
from django.conf import settings
|
|
from django.urls import reverse
|
|
from django.contrib.auth.models import User
|
|
from django.utils.text import slugify
|
|
from django.utils.timezone import now
|
|
|
|
from rest_framework import serializers
|
|
from rest_framework.generics import GenericAPIView, ListAPIView
|
|
from rest_framework.permissions import IsAdminUser
|
|
from rest_framework.response import Response
|
|
from rest_framework import status, filters, mixins, viewsets, routers, exceptions
|
|
|
|
from .models import Origin, Document, UserDocument, Validation
|
|
from . import utils, api_errors, api_fields
|
|
|
|
try:
|
|
from mellon.models import UserSAMLIdentifier
|
|
except ImportError:
|
|
UserSAMLIdentifier = None
|
|
|
|
|
|
class UserSerializerMixin(serializers.Serializer):
|
|
user_email = serializers.CharField(write_only=True, required=False)
|
|
user_nameid = serializers.CharField(write_only=True, required=False)
|
|
|
|
def validate(self, data):
|
|
if not (data.get('user_email') or data.get('user_nameid')):
|
|
raise api_errors.APIError('MISSING_USER')
|
|
if data.get('user_email'):
|
|
user = User.objects.get(email=data.get('user_email'))
|
|
elif data.get('user_nameid'):
|
|
user = UserSAMLIdentifier.objects.get(name_id=data.get('user_nameid')).user
|
|
data.pop('user_email', None)
|
|
data.pop('user_nameid', None)
|
|
data['user'] = user
|
|
return data
|
|
|
|
def validate_user_email(self, value):
|
|
try:
|
|
User.objects.get(email=value)
|
|
except User.MultipleObjectsReturned:
|
|
raise api_errors.APIError('TOO_MANY_USERS')
|
|
except User.DoesNotExist:
|
|
raise api_errors.APIError('UNKNOWN_USER')
|
|
return value
|
|
|
|
def validate_user_nameid(self, value):
|
|
if value:
|
|
assert UserSAMLIdentifier, 'nameid lookups require django-mellon'
|
|
try:
|
|
UserSAMLIdentifier.objects.get(name_id=value)
|
|
except UserSAMLIdentifier.DoesNotExist:
|
|
raise api_errors.APIError('UNKNOWN_USER')
|
|
return value
|
|
|
|
|
|
class CommonAPIMixin(object):
|
|
def handle_exception(self, exc):
|
|
if isinstance(exc, exceptions.APIException):
|
|
exc.detail = {'result': 0, 'errors': exc.detail}
|
|
return super(CommonAPIMixin, self).handle_exception(exc)
|
|
|
|
def finalize_response(self, request, response, *args, **kwargs):
|
|
if not isinstance(response.data, dict) or 'result' not in response.data:
|
|
response.data = {'result': 1, 'data': response.data}
|
|
return super(CommonAPIMixin, self).finalize_response(request, response, *args, **kwargs)
|
|
|
|
|
|
def get_max_size():
|
|
return settings.FARGO_MAX_DOCUMENT_SIZE
|
|
|
|
|
|
class PushDocumentSerializer(UserSerializerMixin):
|
|
origin = serializers.CharField(required=True)
|
|
file_b64_content = api_fields.Base64FileField(required=True, max_size=get_max_size)
|
|
file_name = serializers.CharField(required=False)
|
|
deletable_by_user = serializers.BooleanField(required=False, default=True)
|
|
|
|
def validate(self, data):
|
|
data = super(PushDocumentSerializer, self).validate(data)
|
|
user = data['user']
|
|
if (Document.objects.used_space(user) + data['file_b64_content'].size
|
|
> settings.FARGO_MAX_DOCUMENT_BOX_SIZE):
|
|
raise api_errors.APIError('BOX_IS_FULL', limit=str(settings.FARGO_MAX_DOCUMENT_BOX_SIZE))
|
|
return data
|
|
|
|
|
|
class PushDocument(CommonAPIMixin, GenericAPIView):
|
|
serializer_class = PushDocumentSerializer
|
|
permission_classes = (IsAdminUser,)
|
|
|
|
def post(self, request, format=None):
|
|
serializer = self.get_serializer(data=request.data)
|
|
if not serializer.is_valid():
|
|
raise serializers.ValidationError(serializer.errors)
|
|
|
|
data = serializer.validated_data
|
|
|
|
origin, created = Origin.objects.get_or_create(
|
|
slug=slugify(data.get('origin')),
|
|
defaults={'label': data.get('origin')})
|
|
|
|
document_file = data['file_b64_content']
|
|
if data.get('file_name'):
|
|
document_file.name = data.get('file_name')
|
|
content_hash = utils.sha256_of_file(document_file)
|
|
document, created = Document.objects.get_or_create(
|
|
content_hash=content_hash,
|
|
defaults={'content': document_file})
|
|
|
|
user_document, created = UserDocument.objects.get_or_create(
|
|
user=data.get('user'),
|
|
filename=data.get('file_name'),
|
|
document=document,
|
|
origin=origin,
|
|
deletable_by_user=data.get('deletable_by_user'))
|
|
if not created:
|
|
raise api_errors.APIError('DOCUMENT_EXISTS')
|
|
user_document.save()
|
|
|
|
response_status = status.HTTP_200_OK
|
|
return Response(None, response_status)
|
|
|
|
push_document = PushDocument.as_view()
|
|
|
|
|
|
class UserDocumentSerializer(serializers.Serializer):
|
|
def to_representation(self, obj):
|
|
return {
|
|
'label': obj.filename,
|
|
'url': self.context['request'].build_absolute_uri(obj.get_download_url()),
|
|
}
|
|
|
|
|
|
class RecentDocuments(ListAPIView):
|
|
serializer_class = UserDocumentSerializer
|
|
|
|
def get_queryset(self):
|
|
return UserDocument.objects.filter(
|
|
user=self.request.user,
|
|
created__gt=datetime.datetime.now() - datetime.timedelta(days=14)
|
|
).order_by('-created')[:10]
|
|
|
|
recent_documents = RecentDocuments.as_view()
|
|
|
|
|
|
class ValidationSerializer(UserSerializerMixin, serializers.ModelSerializer):
|
|
origin = api_fields.SlugCreatedRelatedField(slug_field='label', queryset=Origin.objects.all())
|
|
url = serializers.SerializerMethodField()
|
|
display = serializers.CharField(read_only=True)
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
schema = kwargs.pop('schema')
|
|
super(ValidationSerializer, self).__init__(*args, **kwargs)
|
|
self.document_type = schema['name']
|
|
self.document_type_schema = schema
|
|
for field in schema['metadata']:
|
|
name = field['varname']
|
|
required = field.get('required', True)
|
|
self.fields[name] = serializers.CharField(
|
|
source='data.%s' % name, required=required, allow_blank=True)
|
|
|
|
def get_url(self, instance):
|
|
url = reverse('fargo-api-validation-detail',
|
|
kwargs={'document_type': instance.document_type, 'pk': instance.pk})
|
|
if 'request' in self.context:
|
|
url = self.context['request'].build_absolute_uri(url)
|
|
return url
|
|
|
|
def validate(self, data):
|
|
data = super(ValidationSerializer, self).validate(data)
|
|
data['document_type'] = self.document_type
|
|
data['created'] = now().replace(microsecond=0)
|
|
data['start'] = data['created'].date()
|
|
data['end'] = data['start'] + datetime.timedelta(seconds=settings.FARGO_VALIDATION_LIFETIME)
|
|
data['creator'] = data['creator'][:256]
|
|
return data
|
|
|
|
class Meta:
|
|
model = Validation
|
|
exclude = ('data', 'user', 'document_type')
|
|
read_only_fields = ('created', 'start', 'end')
|
|
|
|
|
|
class FilterByUser(filters.BaseFilterBackend):
|
|
def filter_queryset(self, request, queryset, view):
|
|
if 'user_email' in request.GET:
|
|
return queryset.filter(user__email=request.GET['user_email'])
|
|
elif 'user_nameid' in request.GET:
|
|
return queryset.filter(user__saml_identifiers__name_id=request.GET['user_nameid'])
|
|
return queryset
|
|
|
|
|
|
class ValidationAPI(CommonAPIMixin,
|
|
mixins.CreateModelMixin,
|
|
mixins.RetrieveModelMixin,
|
|
mixins.ListModelMixin,
|
|
viewsets.GenericViewSet):
|
|
serializer_class = ValidationSerializer
|
|
permission_classes = (IsAdminUser,)
|
|
filter_backends = [FilterByUser]
|
|
queryset = Validation.objects.all()
|
|
|
|
def get_queryset(self):
|
|
return super(ValidationAPI, self).get_queryset().filter(document_type=self.document_type)
|
|
|
|
def initial(self, request, document_type, *args, **kwargs):
|
|
self.schema = utils.get_document_type_schema(settings, document_type)
|
|
if not self.schema:
|
|
error = serializers.ValidationError('unknown document type')
|
|
error.status_code = status.HTTP_404_NOT_FOUND
|
|
raise error
|
|
self.document_type = document_type
|
|
super(ValidationAPI, self).initial(request, document_type, *args, **kwargs)
|
|
|
|
def get_serializer(self, *args, **kwargs):
|
|
# pass schema to serializer class
|
|
return super(ValidationAPI, self).get_serializer(schema=self.schema, *args, **kwargs)
|
|
|
|
router = routers.SimpleRouter()
|
|
router.register(r'validation/(?P<document_type>[^/]*)', ValidationAPI,
|
|
base_name='fargo-api-validation')
|