315 lines
12 KiB
Python
315 lines
12 KiB
Python
# fargo - document box
|
|
# Copyright (C) 2016-2019 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import logging
|
|
from json import dumps
|
|
from copy import deepcopy
|
|
|
|
from django.core.exceptions import PermissionDenied
|
|
from django.views.decorators.clickjacking import xframe_options_exempt
|
|
from django.views.generic import CreateView, DeleteView, UpdateView, View, TemplateView
|
|
from django.core.urlresolvers import reverse, reverse_lazy
|
|
from django.contrib.auth.decorators import login_required
|
|
from django.shortcuts import get_object_or_404, resolve_url
|
|
from django.http import (HttpResponse, HttpResponseRedirect,
|
|
HttpResponseBadRequest, HttpResponseForbidden, Http404)
|
|
from django.core import signing
|
|
from django.contrib import messages
|
|
from django.contrib.auth import get_user_model, REDIRECT_FIELD_NAME
|
|
from django.contrib.auth import logout as auth_logout
|
|
from django.contrib.auth import views as auth_views
|
|
from django.utils.http import quote
|
|
from django.utils.translation import ugettext as _
|
|
from django.utils.decorators import method_decorator
|
|
from django.conf import settings
|
|
|
|
from django_tables2 import SingleTableMixin
|
|
|
|
from ..utils import make_url
|
|
from . import models, forms, tables
|
|
|
|
try:
|
|
from mellon.utils import get_idps
|
|
except ImportError:
|
|
def get_idps():
|
|
return []
|
|
|
|
|
|
class Logger(object):
|
|
def __init__(self, *args, **kwargs):
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CommonUpload(Logger, CreateView):
|
|
form_class = forms.UploadForm
|
|
model = models.UserDocument
|
|
template_name = 'fargo/upload.html'
|
|
|
|
def get_form_kwargs(self, **kwargs):
|
|
kwargs = super(CommonUpload, self).get_form_kwargs(**kwargs)
|
|
kwargs['instance'] = models.UserDocument(
|
|
user=self.request.user)
|
|
return kwargs
|
|
|
|
def form_valid(self, form):
|
|
result = super(CommonUpload, self).form_valid(form)
|
|
self.logger.info(u'user uploaded file %s (sha256=%s)',
|
|
self.object.filename,
|
|
self.object.document.content_hash)
|
|
return result
|
|
|
|
|
|
class Upload(CommonUpload):
|
|
def get_success_url(self):
|
|
homepage = reverse('home')
|
|
return self.request.GET.get(REDIRECT_FIELD_NAME, homepage)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if 'cancel' in request.POST:
|
|
return HttpResponseRedirect(self.get_success_url())
|
|
return super(Upload, self).post(request, *args, **kwargs)
|
|
|
|
|
|
class Documents(object):
|
|
def get_queryset(self):
|
|
return models.UserDocument.objects \
|
|
.filter(user=self.request.user) \
|
|
.select_related('document', 'user')
|
|
|
|
|
|
class Homepage(Documents, SingleTableMixin, CommonUpload):
|
|
'''Show documents of users, eventually paginate and sort them.'''
|
|
template_name = 'fargo/home.html'
|
|
form_class = forms.UploadForm
|
|
table_class = tables.DocumentTable
|
|
table_pagination = {
|
|
'per_page': 5,
|
|
}
|
|
success_url = reverse_lazy('home')
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(Homepage, self).get_context_data(**kwargs)
|
|
ctx['include_edit_link'] = settings.INCLUDE_EDIT_LINK
|
|
ctx['max_document_size'] = settings.FARGO_MAX_DOCUMENT_SIZE
|
|
occupancy = ctx['occupancy'] = models.Document.occupancy_for_user(self.request.user)
|
|
max_size = ctx['max_portfolio_size'] = settings.FARGO_MAX_DOCUMENT_BOX_SIZE
|
|
ctx['occupancy_ratio'] = float(occupancy) / max_size
|
|
ctx['occupancy_ratio_percent'] = float(occupancy) * 100.0 / max_size
|
|
return ctx
|
|
|
|
|
|
class PickView(object):
|
|
def dispatch(self, request, *args, **kwargs):
|
|
self.pick_url = request.GET.get('pick')
|
|
if not self.pick_url:
|
|
return HttpResponseBadRequest('missing pick parameter')
|
|
return super(PickView, self).dispatch(request, *args, **kwargs)
|
|
|
|
|
|
class PickList(PickView, Homepage):
|
|
template_name = 'fargo/pick.html'
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if 'cancel' in request.POST:
|
|
return HttpResponseRedirect(make_url(self.pick_url, cancel=''))
|
|
return super(PickList, self).post(request, *args, **kwargs)
|
|
|
|
|
|
class Delete(Logger, Documents, DeleteView):
|
|
model = models.UserDocument
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
if not self.get_object().deletable_by_user:
|
|
raise PermissionDenied()
|
|
result = super(Delete, self).delete(request, *args, **kwargs)
|
|
messages.info(request, _('File %s deleted') % self.object.filename)
|
|
self.logger.info('user deleted file %r(%s)', self.object.filename,
|
|
self.object.pk)
|
|
return result
|
|
|
|
def get_success_url(self):
|
|
return '../..?%s' % self.request.META['QUERY_STRING']
|
|
|
|
|
|
class Edit(Logger, UpdateView):
|
|
model = models.UserDocument
|
|
form_class = forms.EditForm
|
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
if getattr(request, 'user', None) != self.get_object().user:
|
|
raise PermissionDenied
|
|
return super(Edit, self).dispatch(request, *args, **kwargs)
|
|
|
|
def get_success_url(self):
|
|
return '../..?%s' % self.request.META['QUERY_STRING']
|
|
|
|
|
|
class Pick(PickView, Documents, Logger, View):
|
|
http_method_allowed = ['post']
|
|
|
|
def post(self, request, pk):
|
|
user_document = get_object_or_404(self.get_queryset(), pk=pk,
|
|
user=request.user)
|
|
token = signing.dumps(user_document.pk)
|
|
download_url = make_url(
|
|
reverse('remote_download', kwargs={'filename': user_document.filename}),
|
|
token=token,
|
|
request=request)
|
|
self.logger.info(u'user picked file %s sha256 %s returned to %s',
|
|
user_document.filename,
|
|
user_document.document.content_hash, pick)
|
|
return HttpResponseRedirect(make_url(self.pick_url, url=download_url))
|
|
|
|
|
|
class Download(Documents, Logger, View):
|
|
def get(self, request, pk, filename):
|
|
user_document = get_object_or_404(self.get_queryset(), pk=pk,
|
|
user=self.request.user)
|
|
self.logger.info('user download file %s with hash %s',
|
|
user_document.filename,
|
|
user_document.document.content_hash)
|
|
return self.return_user_document(user_document)
|
|
|
|
def return_user_document(self, user_document):
|
|
response = HttpResponse(user_document.document.content.chunks(),
|
|
content_type='application/octet-stream')
|
|
response['Content-disposition'] = 'attachment'
|
|
return response
|
|
|
|
|
|
class Thumbnail(Documents, View):
|
|
def get(self, request, pk, filename):
|
|
user_document = get_object_or_404(self.get_queryset(), pk=pk,
|
|
user=self.request.user)
|
|
thumbnail = user_document.document.thumbnail
|
|
if not thumbnail:
|
|
raise Http404
|
|
return HttpResponse(thumbnail.read(), content_type='image/jpeg')
|
|
|
|
|
|
class RemoteDownload(Download):
|
|
'''Allow downloading any file given the URL contains a signed token'''
|
|
def get(self, request, filename):
|
|
if 'token' not in request.GET:
|
|
return HttpResponseForbidden('missing token')
|
|
# FIXME: maybe we should mark token as invalid after use using the
|
|
# cache ?
|
|
token = request.GET['token']
|
|
# token are valid only 1 minute
|
|
try:
|
|
pk = signing.loads(token, max_age=60)
|
|
except signing.SignatureExpired:
|
|
return HttpResponseForbidden('token has expired')
|
|
except signing.BadSignature:
|
|
return HttpResponseForbidden('token signature is invalid')
|
|
user_document = get_object_or_404(models.UserDocument, pk=pk)
|
|
self.logger.info('anonymous download of file %s from user %s(%s) with hash %s',
|
|
user_document.filename,
|
|
user_document.user,
|
|
user_document.user.pk,
|
|
user_document.document.content_hash)
|
|
return self.return_user_document(user_document)
|
|
|
|
|
|
class JSONP(Documents, View):
|
|
def get_data(self, request):
|
|
d = []
|
|
for user_document in self.get_queryset():
|
|
url = reverse('download',
|
|
kwargs={'pk': user_document.pk,
|
|
'filename': user_document.filename})
|
|
url = request.build_absolute_uri(url)
|
|
d.append({
|
|
'filename': user_document.filename,
|
|
'url': url,
|
|
})
|
|
return d
|
|
|
|
def get(self, request):
|
|
callback = request.GET.get('callback', 'callback')
|
|
s = '%s(%s)' % (callback.encode('ascii'),
|
|
dumps(self.get_data(request)))
|
|
return HttpResponse(s, content_type='application/javascript')
|
|
|
|
|
|
class JSON(JSONP):
|
|
def get(self, request):
|
|
username = request.GET.get('username')
|
|
if username:
|
|
User = get_user_model()
|
|
request.user = get_object_or_404(User, username=username)
|
|
elif not request.user.is_authenticated():
|
|
return method_decorator(login_required)(JSON.get)(self, request)
|
|
response = HttpResponse(dumps(self.get_data(request)),
|
|
content_type='application/json')
|
|
response['Access-Control-Allow-Origin'] = '*'
|
|
return response
|
|
|
|
|
|
class ChooseDocumentKind(TemplateView):
|
|
template_name = 'fargo/choose_document_kind.html'
|
|
|
|
def get_context_data(self, **kwargs):
|
|
ctx = super(ChooseDocumentKind, self).get_context_data(**kwargs)
|
|
ctx['document_kinds'] = settings.FARGO_DOCUMENT_KINDS
|
|
return ctx
|
|
|
|
|
|
def login(request, *args, **kwargs):
|
|
if any(get_idps()):
|
|
if 'next' not in request.GET:
|
|
return HttpResponseRedirect(resolve_url('mellon_login'))
|
|
return HttpResponseRedirect(resolve_url('mellon_login') + '?next='
|
|
+ quote(request.GET.get('next')))
|
|
return auth_views.login(request, *args, **kwargs)
|
|
|
|
|
|
def logout(request, next_page=None):
|
|
if any(get_idps()):
|
|
return HttpResponseRedirect(resolve_url('mellon_logout'))
|
|
auth_logout(request)
|
|
if next_page is not None:
|
|
next_page = resolve_url(next_page)
|
|
else:
|
|
next_page = '/'
|
|
return HttpResponseRedirect(next_page)
|
|
|
|
|
|
class DocumentTypes(View):
|
|
def get(self, request):
|
|
document_types = deepcopy(settings.FARGO_DOCUMENT_TYPES)
|
|
for document_type in document_types:
|
|
document_type.pop('display_template', None)
|
|
data = {
|
|
'err': 0,
|
|
'data': document_types,
|
|
}
|
|
return HttpResponse(dumps(data), content_type='application/json')
|
|
|
|
|
|
home = login_required(Homepage.as_view())
|
|
download = login_required(Download.as_view())
|
|
thumbnail = login_required(Thumbnail.as_view())
|
|
upload = login_required(Upload.as_view())
|
|
remote_download = RemoteDownload.as_view()
|
|
delete = login_required(Delete.as_view())
|
|
edit = login_required(Edit.as_view())
|
|
pick = login_required(Pick.as_view())
|
|
jsonp = login_required(JSONP.as_view())
|
|
json = login_required(JSON.as_view())
|
|
pick_list = xframe_options_exempt(login_required(PickList.as_view()))
|
|
document_types = DocumentTypes.as_view()
|