docbow/docbow_project/docbow/views.py

1004 lines
34 KiB
Python

import smtplib
from itertools import count, chain
import socket
import os.path
import operator
import datetime
import tempfile
import zipfile
from django.contrib.auth.decorators import login_required
from django.core.files.base import File
import django.contrib.auth as auth
from django.shortcuts import render, redirect, get_object_or_404
from django.conf import settings
from django.urls import reverse
from django.http import HttpResponse, HttpResponseForbidden, HttpResponseRedirect, Http404, JsonResponse
from django.db.models.expressions import RawSQL
from django.db.models.query import Q
from django.contrib.auth.models import User
from django.contrib import messages
from django.core.mail import EmailMessage
from django.utils.translation import ugettext as _, ugettext_noop as N_, ungettext
from django.forms.forms import NON_FIELD_ERRORS
from django.utils.timezone import now
from django.views.generic.list import MultipleObjectMixin, ListView
from django.views.generic.base import View, RedirectView
from django.views.decorators.debug import sensitive_post_parameters
from django.views.decorators.cache import never_cache
from django.views.decorators.http import require_http_methods
from django.utils.encoding import force_text
from django.utils import six
try:
from BeautifulSoup import BeautifulSoup
except ImportError:
from bs4 import BeautifulSoup
import django_tables2.views as tables_views
import watson.search as watson
from docbow_project.docbow.forms import (
FileForm,
AnonymousContactForm,
ContactForm,
ForwardingForm,
FilterForm,
)
from docbow_project.docbow.models import (
Mailbox,
AttachedFile,
SendingLimitation,
MailingList,
is_guest,
Document,
FileType,
DeletedDocument,
SeenDocument,
non_guest_users,
username,
)
from docbow_project.docbow.decorators import as_delegate
from docbow_project.docbow import tables
from docbow_project.docbow import app_settings
from docbow_project.docbow import unicodecsv
from docbow_project.docbow import profile_views
from docbow_project.docbow import sql
from docbow_project.docbow.utils import date_to_aware_datetime
gettext_noop = lambda x: x
@login_required
def homepage(request):
response = redirect('inbox')
server_name = request.META.get('SERVER_NAME')
if server_name.count('.') >= 2:
domain = '.' + '.'.join(server_name.split('.')[-2:])
response.set_cookie('docbow-user', 'true', domain=domain, max_age=30 * 86400)
return response
def get_base_document_query(request, qs, outbox):
related_users = get_related_users(request)
mailboxes = Mailbox.objects.filter(owner__in=related_users, outbox=outbox)
private_filter = Q(private=False) | Q(private=True, mailboxes__owner=request.user)
if hasattr(request.user, 'delegate'):
private_filter = Q(private=False)
return qs.filter(mailboxes__in=mailboxes).filter(private_filter)
def get_document(request, mailbox_id, outbox):
related_users = get_related_users(request)
mailboxes = Mailbox.objects.filter(owner__in=related_users, outbox=outbox)
private_filter = Q(private=False) | Q(private=True, mailboxes__owner=request.user)
if hasattr(request.user, 'delegate'):
private_filter = Q(private=False)
return (
Document.objects.exclude(deleteddocument__user=request.user, deleteddocument__soft_delete=False)
.filter(id=mailbox_id, mailboxes__in=mailboxes)
.filter(private_filter)
.distinct()
.get()
)
class Row(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def user_mailing_list_names(user):
return user.mailing_lists.values_list('name', flat=True)
def recursive_lists(lists):
lists = set(lists)
while True:
old_count = len(lists)
lists |= set(MailingList.objects.filter(is_active=True).filter(members_lists__in=lists))
if len(lists) == old_count:
break
return lists
def get_file_form_kwargs(request):
user = request.user
if is_guest(user):
user = user.delegations_by.get().by
delegators = []
else:
delegators = non_guest_users().filter(Q(id=user.id) | Q(delegations_to__to=user)).distinct()
user_lists = MailingList.objects.is_member_of(user)
own_limitations = (
MailingList.objects.filter(is_active=True)
.filter(lists_limitation__mailing_list__in=user_lists)
.distinct()
.order_by('name')
)
if not own_limitations:
return {}
kwargs = {}
user_lists = MailingList.objects.are_member_of([user] + list(delegators))
if SendingLimitation.objects.filter(mailing_list__in=user_lists):
lists = (
MailingList.objects.filter(is_active=True)
.filter(lists_limitation__mailing_list__in=user_lists)
.distinct()
.order_by('name')
)
lists = recursive_lists(lists)
users = non_guest_users().filter(mailing_lists__in=lists).distinct()
if lists:
kwargs['list_qs'] = lists
kwargs['user_qs'] = users
return kwargs
def get_filetype_limitation(user):
# find delegation relations
if is_guest(user):
user = user.delegations_by.get().by
delegators = []
else:
delegators = non_guest_users().filter(Q(id=user.id) | Q(delegations_to__to=user)).distinct()
# if user has basically no limitation, do not limit him
user_lists = MailingList.objects.is_member_of(user)
own_limitations = (
FileType.objects.filter(is_active=True, filetype_limitation__mailing_list__in=user_lists)
.distinct()
.order_by('name')
)
if not own_limitations.exists():
return FileType.objects.none()
if delegators:
user_lists = MailingList.objects.are_member_of([user] + list(delegators))
return (
FileType.objects.filter(is_active=True, filetype_limitation__mailing_list__in=user_lists)
.distinct()
.order_by('name')
)
else:
return own_limitations
def get_table_kwargs():
res = {}
if not settings.EXTRA_SENDERS:
res['exclude'] = ('extra_senders',)
return res
@login_required
@never_cache
@watson.update_index()
def send_file(request, file_type_id):
file_type = get_object_or_404(FileType, id=file_type_id)
try:
reply_to = get_document(request, request.GET['reply_to'], False)
except Document.DoesNotExist:
raise Http404
except KeyError:
reply_to = None
if hasattr(request.user, 'delegate'):
delegators = User.objects.none()
else:
delegators = (
User.objects.filter(Q(id=request.user.id) | Q(delegations_to__to=request.user))
.filter(is_active=True)
.order_by('last_name', 'first_name', 'username')
.distinct()
)
real_user = getattr(request.user, 'delegate', request.user)
limitations = get_filetype_limitation(request.user)
if limitations:
if not limitations.filter(id=file_type.id).exists():
return redirect('send-file-selector')
if request.method == 'POST':
if 'send' not in request.POST:
return redirect('outbox')
form = FileForm(
request.POST,
request.FILES,
default_sender=request.user,
user=real_user,
delegations=delegators,
reply_to=reply_to,
file_type=file_type,
**get_file_form_kwargs(request),
)
try:
if form.is_valid():
new_send = form.save(commit=False)
to_user = []
to_list = []
for recipient in form.cleaned_data['recipients']:
if recipient.startswith('list-'):
i = recipient.split('-')[1]
to_list.append(int(i))
elif recipient.startswith('user-'):
i = recipient.split('-')[1]
to_user.append(int(i))
new_send.save()
form.save_m2m()
form.save_attachments()
new_send.to_user.set(to_user)
new_send.to_list.set(to_list)
recipients_count = new_send.post()
request.record(
'create-document',
'sent document {document} ' 'delivered to {recipients_count}',
document=new_send,
recipients_count=recipients_count,
)
return redirect('outbox')
except Exception:
import logging
logging.getLogger(__name__).exception('unable to create a new document')
form._errors.setdefault(NON_FIELD_ERRORS, form.error_class()).append(
_(
'An internal error occured, administrators '
'have been notified; sending seems blocked at the moment. You should '
'try agrain later. If it still does not work then, contact '
'your administrator.'
)
)
else:
form = FileForm(
default_sender=request.user,
user=real_user,
delegations=delegators,
reply_to=reply_to,
file_type=file_type,
**get_file_form_kwargs(request),
)
form_action = ''
if reply_to:
form_action = '?reply_to=%s' % reply_to.id
return render(
request,
'docbow/send_file.html',
{
'form': form,
'view_name': 'send-file',
'reply_to': reply_to,
'url': request.GET.get('url'),
'form_action': form_action,
},
)
@never_cache
def upload(request, attached_file):
response = HttpResponse(attached_file.content.chunks(), content_type='application/octet-stream')
response['Content-disposition'] = 'attachment'
return response
def get_document_and_mark_seen(request, mailbox_id, outbox):
try:
document = get_document(request, mailbox_id, outbox)
except Document.DoesNotExist:
raise Http404
if not SeenDocument.objects.filter(document=document, user=request.user).exists():
SeenDocument.objects.create(document=document, user=request.user)
return document
@login_required
@never_cache
def message_attached_file(request, mailbox_id, attached_file, outbox=False):
"""
Download attached files, verify that the user has access to the document
before, otherwise return 404.
"""
document = get_document_and_mark_seen(request, mailbox_id, outbox)
attached_file = get_object_or_404(AttachedFile, document=document, pk=attached_file)
request.record(
'download',
'download attached file {attached_file} of document {document}',
attached_file=attached_file,
document=document,
)
return upload(request, attached_file)
@login_required
@never_cache
def message_all_files(request, mailbox_id, outbox=False):
document = get_document_and_mark_seen(request, mailbox_id, outbox)
if not document.has_zip_download():
return redirect('inbox')
temp = tempfile.TemporaryFile()
with zipfile.ZipFile(temp, mode='w') as zf:
for file_ in document.attached_files.all():
zf.write(file_.content.path, file_.name)
temp.seek(0)
response = HttpResponse(File(temp), content_type='application/octet-stream')
response['Content-disposition'] = 'attachment; filename="documents.zip"'
request.record('download', 'download zip archive of {document}', document=document)
return response
def form_to_user_and_list(form):
to_user = []
to_list = []
for recipient in form.cleaned_data['recipients']:
if recipient.startswith('list-'):
i = recipient.split('-')[1]
to_list.append(int(i))
elif recipient.startswith('user-'):
i = recipient.split('-')[1]
to_user.append(int(i))
return to_user, to_list
@login_required
@never_cache
def message(request, mailbox_id, outbox=False):
try:
document = get_document(request, mailbox_id, outbox)
except Document.DoesNotExist:
if Document.objects.filter(pk=mailbox_id):
messages.warning(request, _('You cannot see this document as you are not the recipient'))
else:
messages.warning(request, _('Document not found'))
if outbox:
return redirect('outbox')
else:
return redirect('inbox')
if outbox:
back_pair = (reverse('outbox'), gettext_noop('back to outbox'))
else:
back_pair = (reverse('inbox'), gettext_noop('back to inbox'))
back = 'outbox' if outbox else 'inbox'
ctx = {'outbox': outbox, 'document': document, 'view_name': back, 'back': back_pair}
if not outbox:
limitations = get_filetype_limitation(request.user)
if not limitations or limitations.filter(id=document.filetype.id).exists():
if request.method == 'POST':
form = ForwardingForm(request.POST, user=request.user, **get_file_form_kwargs(request))
if form.is_valid():
users, lists = form_to_user_and_list(form)
recipients_count, forwarded_document = document.forward(
form.cleaned_data['sender'], lists, users
)
request.record(
'forward-document',
'forwarded document ' '{document} as new document {new_document}',
document=forwarded_document.from_document,
new_document=forwarded_document.to_document,
)
msg = ungettext(
'Document forwarded to {recipients_count} ' 'recipient.',
'Document forwarded to {recipients_count} ' 'recipients.',
recipients_count,
).format(recipients_count=recipients_count)
messages.info(request, msg)
return redirect('inbox-message', mailbox_id=document.pk)
else:
form = ForwardingForm(user=request.user, **get_file_form_kwargs(request))
ctx['form'] = form
ctx['attached_files'] = attached_files = []
for attached_file in document.attached_files.order_by('kind__position', 'kind__name', 'id'):
if attached_files and attached_files[-1][0] == attached_file.kind:
attached_files[-1][1].append(attached_file)
else:
attached_files.append((attached_file.kind, [attached_file]))
request.record('message-view', 'looked at document {document}', document=document)
ctx['related_users'] = get_related_users(request)
extra_senders = ''
if document.extra_senders.exists():
extra_senders = ", ".join([username(sender) for sender in document.extra_senders.all()])
ctx['extra_senders'] = extra_senders
return render(request, 'docbow/message.html', ctx)
def get_help_content(pagename):
filepath = os.path.join(settings.HELP_DIR, pagename)
parsed_doc = BeautifulSoup(open(filepath).read())
try:
page = parsed_doc.findAll(True, role='main')[0]
except IndexError: # newer yelp-tools
page = parsed_doc.findAll('main')[0]
for t in page.findAll('h4'):
t.name = 'h6'
for t in page.findAll('h3'):
t.name = 'h5'
for t in page.findAll('h2'):
t.name = 'h4'
for t in page.findAll('h1'):
t.name = 'h3'
return force_text(page)
@login_required
def help(request, pagename='index.html'):
if pagename.endswith('.html'):
return render(
request, 'docbow/help.html', {'view_name': 'help', 'content': get_help_content(pagename)}
)
else:
filepath = os.path.join(settings.HELP_DIR, pagename)
if not os.path.abspath(filepath).startswith(settings.HELP_DIR):
return HttpResponseForbidden()
response = HttpResponse(content=open(filepath, 'rb'))
response['Content-Type'] = 'image/png'
return response
def contact(request, template='docbow/contact.html', form_class=AnonymousContactForm):
user = request.user
if user.is_authenticated:
template = 'docbow/contact_user.html'
form_class = ContactForm
contacts = User.objects.filter(groups__name__in=settings.CONTACT_GROUPS)
if not bool(contacts):
msg = N_('unable to send the contact mail because there is no ' 'administrator group to receive them')
request.error_record('error', msg)
messages.error(request, _(msg))
return redirect('inbox')
if request.method == 'POST':
form = form_class(request.POST)
if form.is_valid():
cleaned_data = form.cleaned_data
to = [contact.email for contact in contacts]
subject = settings.CONTACT_SUBJECT_PREFIX + cleaned_data['subject']
message = cleaned_data['message']
if user.is_authenticated:
reply_to = user.email
body = (
_('Message from %(name)s <%(email)s>') % {'name': user.get_full_name(), 'email': reply_to}
+ '\n\n%s' % message
)
else:
reply_to = cleaned_data['email']
body = _('Message from %(name)s <%(email)s>') % cleaned_data
if cleaned_data.get('phone_number'):
body += _('\nPhone number: %s') % cleaned_data['phone_number']
body += '\n\n%s' % message
try:
EmailMessage(to=to, subject=subject, body=body, headers={'Reply-To': reply_to}).send()
messages.info(request, _('Your message was sent to %d administrators') % len(to))
request.record(
'contact',
'sent mail to administrators with '
'subject "{subject}" and message "{message}", reply should be sent to email '
'{reply_to} or phone number "{phone}"',
subject=subject,
message=message,
reply_to=reply_to,
phone=cleaned_data.get('phone_number'),
)
except (smtplib.SMTPException, socket.error):
import logging
logging.getLogger(__name__).exception('unable to send mail to administrators')
request.error_record(
'error',
'unable to send mail to administrators with '
'subject "{subject}" and message "{message}", reply should be sent to email '
'{reply_to} or phone number "{phone}"',
subject=subject,
message=message,
reply_to=reply_to,
phone=cleaned_data.get('phone_number'),
)
return redirect('inbox')
else:
form = form_class()
return render(request, template, {'form': form, 'view_name': 'contact'})
def logout(request):
auth.logout(request)
return redirect('inbox')
@login_required
@never_cache
def delete(request, mailbox_id, outbox=False):
'''Remove a document from the inbox'''
page = request.GET.get('page', 1)
back = 'outbox' if outbox else 'inbox'
viewname = back + '-message-delete'
back_pair = ('%s?page=%s' % (reverse(back), page), gettext_noop('back to %s' % back))
try:
document = get_document(request, mailbox_id, outbox)
except Document.DoesNotExist:
return redirect(back_pair[0])
if request.method == 'GET':
return render(
request, 'docbow/delete.html', {'document': document, 'back': back_pair, 'view_name': viewname}
)
else:
try:
DeletedDocument.objects.get(user=request.user, document=document)
except DeletedDocument.DoesNotExist:
DeletedDocument.objects.create(
user=request.user, document=document, soft_delete=True, soft_delete_date=now()
)
request.record('delete-document', 'marked document {document} as deleted', document=document)
return redirect(back_pair[0])
def inbox_by_document(request, document_id):
return redirect('inbox-message', mailbox_id=document_id)
@require_http_methods(["POST"])
def restore(request, doc_id, outbox=False):
try:
deleted_doc = DeletedDocument.objects.get(user=request.user, document__pk=doc_id, soft_delete=True)
deleted_doc.delete()
except DeletedDocument.DoesNotExist:
pass
return_view = 'outbox-trash' if outbox else 'inbox-trash'
return redirect(return_view)
from django.contrib.auth import SESSION_KEY, BACKEND_SESSION_KEY
from django import http
def su(request, username, redirect_url='/'):
"""Allows changing user for super-users. Super-user status is kept using a
flag on the session.
"""
if request.user.is_superuser or request.session.get('has_superuser_power'):
request.session.pop('related_users', None)
su_user = get_object_or_404(User, username=username)
if su_user.is_active:
if is_guest(su_user):
real_username = su_user.username.rsplit('-', 1)[0]
real_user = User.objects.get(username=real_username)
pair_id = '%s,%s' % (real_user.id, su_user.id)
request.session[SESSION_KEY] = pair_id
request.session[
BACKEND_SESSION_KEY
] = 'docbow_project.docbow.auth_backend.DelegationAuthBackend'
request.session['has_superuser_power'] = True
else:
request.session[SESSION_KEY] = su_user.id
request.session['has_superuser_power'] = True
request.session[BACKEND_SESSION_KEY] = 'django.contrib.auth.backends.ModelBackend'
return http.HttpResponseRedirect(redirect_url)
else:
return http.HttpResponseRedirect('/')
@login_required
@never_cache
def mailing_lists(request, template='docbow/mailing-lists.html'):
mailing_lists = (
MailingList.objects.active()
.filter(Q(members__isnull=False) | Q(mailing_list_members__isnull=False))
.distinct()
.order_by('name')
)
return render(request, template, {'mailing_lists': mailing_lists})
def robots(request):
output = '''User-Agent: *
Disallow: /
'''
return HttpResponse(output, content_type='text/plain')
def password_reset_done(request):
messages.info(request, _('Email with password reset instruction has been sent.'))
return redirect('auth_login')
class DateFilterMixinView(object):
def get_filter_form(self):
if not hasattr(self, '_form'):
if 'clear' in self.request.GET:
form = FilterForm(request=self.request, outbox=self.outbox)
else:
form = FilterForm(data=self.request.GET, request=self.request, outbox=self.outbox)
self._form = form
return self._form
def get_context_data(self, **kwargs):
context = super(DateFilterMixinView, self).get_context_data(**kwargs)
context['filter_form'] = self.get_filter_form()
return context
def get_queryset(self):
qs = super(DateFilterMixinView, self).get_queryset()
filter_form = self.get_filter_form()
if filter_form.is_valid():
not_before = filter_form.cleaned_data.get('not_before')
not_after = filter_form.cleaned_data.get('not_after')
search_terms = filter_form.cleaned_data.get('search_terms')
if not_before is not None:
not_before = date_to_aware_datetime(not_before)
qs = qs.filter(date__gte=not_before)
if not_after is not None:
not_after += datetime.timedelta(days=1)
not_after = date_to_aware_datetime(not_after)
qs = qs.filter(date__lt=not_after)
if search_terms:
objects_ids = watson.search(search_terms, models=(Document,)).values_list(
'object_id_int', flat=True
)
qs = qs.filter(id__in=objects_ids)
return qs
class ExtraContextMixin(object):
extra_ctx = {}
def get_context_data(self, **kwargs):
context = super(ExtraContextMixin, self).get_context_data(**kwargs)
context.update(self.extra_ctx)
return context
def get_related_users(request):
"""Compute the list of users for which we can see the mailboxes, and cache
it in the session.
"""
if 'related_users' not in request.session:
users = [request.user]
delegations = request.user.delegations_by.select_related('by')
if delegations:
users.extend(delegation.by for delegation in delegations)
request.session['related_users'] = [user.pk for user in users]
return User.objects.filter(pk__in=request.session['related_users'])
class MailboxQuerysetMixin(object):
def get_queryset(self):
qs = super(MailboxQuerysetMixin, self).get_queryset()
if hasattr(self.request.user, 'delegate'):
qs = qs.filter(private=False)
if not hasattr(self, '_qs'):
self._qs = sql.get_documents(
qs, get_related_users(self.request), self.request.user, self.outbox
).distinct()
return self._qs
def get_context_data(self, **kwargs):
ctx = super(MailboxQuerysetMixin, self).get_context_data(**kwargs)
ctx['related_users'] = get_related_users(self.request)
return ctx
class TrashMailboxQuerysetMixin(object):
def get_queryset(self):
qs = super(TrashMailboxQuerysetMixin, self).get_queryset()
if not hasattr(self, '_qs'):
self._qs = sql.get_trash_documents(
qs, get_related_users(self.request), self.request.user, self.outbox
).distinct()
return self._qs
def get_context_data(self, **kwargs):
ctx = super(TrashMailboxQuerysetMixin, self).get_context_data(**kwargs)
ctx['related_users'] = get_related_users(self.request)
return ctx
class MailboxView(ExtraContextMixin, MailboxQuerysetMixin, tables_views.SingleTableView):
model = Document
table_class = tables.MailboxTable
table_pagination = {
'per_page': app_settings.DOCBOW_MAILBOX_PER_PAGE,
}
def get_context_data(self):
context = super().get_context_data()
context['show_trash'] = settings.TRASH_DURATION > 0
return context
def get_table_kwargs(self):
return get_table_kwargs()
class TrashMailboxView(ExtraContextMixin, TrashMailboxQuerysetMixin, tables_views.SingleTableView):
model = Document
table_class = tables.MailboxTable
table_pagination = {
'per_page': app_settings.DOCBOW_MAILBOX_PER_PAGE,
}
def get_table_kwargs(self):
return get_table_kwargs()
class CSVMultipleObjectMixin(object):
mapping = ()
filename = ''
def get_header(self):
return [column['caption'] for column in self.mapping]
def get_cell(self, mailbox, attribute):
value = operator.attrgetter(attribute)(mailbox)
if callable(value):
value = value()
return force_text(value)
def get_row(self, mailbox):
for column in self.mapping:
yield self.get_cell(mailbox, column['attribute'])
def get_rows(self):
qs = self.get_queryset()
for mailbox in qs:
yield list(self.get_row(mailbox))
def get(self, request, *args, **kwargs):
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename="%s"' % self.filename
writer = unicodecsv.UnicodeWriter(response, encoding='utf-8')
writer.writerow(self.get_header())
writer.writerows(self.get_rows())
return response
class ODSMultipleObjectMixin(CSVMultipleObjectMixin):
worksheet_name = '1'
def get(self, request, *args, **kwargs):
from . import ods
response = HttpResponse(content_type='application/vnd.oasis.opendocument.spreadsheet')
response['Content-Disposition'] = 'attachment; filename="%s"' % self.filename
workbook = ods.Workbook(encoding='utf-8')
sheet = workbook.add_sheet(self.worksheet_name)
rows = chain(((0, self.get_header()),), enumerate(self.get_rows(), 1))
for i, row in rows:
for j, cell in enumerate(row):
sheet.write(i, j, force_text(cell))
workbook.save(response)
return response
class CSVMailboxView(
CSVMultipleObjectMixin,
ExtraContextMixin,
MailboxQuerysetMixin,
MultipleObjectMixin,
tables_views.SingleTableMixin,
View,
):
model = Document
@property
def filename(self):
return '{prefix}-{user}-{date}.csv'.format(
prefix=self.filename_prefix, user=self.request.user, date=datetime.date.today()
)
def get_header(self):
table = self.get_table()
for column in table.columns:
yield column.header
def get_row(self, row):
for column, cell in row.items():
yield cell
def get_rows(self):
table = self.get_table()
for table_row in table.rows:
yield self.get_row(table_row)
class ODSMailboxView(ODSMultipleObjectMixin, CSVMailboxView):
@property
def filename(self):
return '{prefix}-{user}-{date}.ods'.format(
prefix=self.filename_prefix, user=self.request.user, date=datetime.date.today()
)
class CSVInboxView(CSVMailboxView):
outbox = False
table_class = tables.InboxCsvTable
filename_prefix = 'inbox'
inbox_csv = CSVInboxView.as_view()
class CSVOutboxView(CSVMailboxView):
outbox = True
table_class = tables.OutboxCsvTable
filename_prefix = 'outbox'
outbox_csv = CSVOutboxView.as_view()
class ODSInboxView(ODSMailboxView, CSVInboxView):
pass
inbox_ods = ODSInboxView.as_view()
class ODSOutboxView(ODSMailboxView, CSVOutboxView):
pass
outbox_ods = ODSOutboxView.as_view()
class DeleteDocumentsView(object):
def post(self, request, *args, **kwargs):
documents = None
selection = request.POST.get('selection', '')
if selection == 'everything':
documents = self.get_queryset()
else:
selection = selection.split(',')
if all(map(six.text_type.isdigit, selection)):
selection = map(int, selection)
documents = self.get_queryset().filter(id__in=selection)
if documents:
if 'delete' in request.POST:
documents = documents.exclude(deleteddocument__user=request.user)
dd = [
DeletedDocument(
user=request.user, document=document, soft_delete=True, soft_delete_date=now()
)
for document in documents
]
DeletedDocument.objects.bulk_create(dd)
return HttpResponseRedirect(request.build_absolute_uri())
class InboxView(DeleteDocumentsView, DateFilterMixinView, MailboxView):
http_method_names = ['get', 'post']
outbox = False
table_class = tables.InboxTable
template_name = 'docbow/inbox_list.html'
extra_ctx = {
'view_name': 'inbox',
}
def get_context_data(self):
context = super().get_context_data()
context['trash_url'] = reverse('inbox-trash')
return context
inbox_view = login_required(InboxView.as_view())
class InboxTrashView(TrashMailboxView):
outbox = False
table_class = tables.InboxTrashTable
template_name = 'docbow/inbox_trash_list.html'
extra_ctx = {
'view_name': 'inbox_trash',
}
inbox_trash_view = login_required(InboxTrashView.as_view())
class OutboxView(DeleteDocumentsView, DateFilterMixinView, MailboxView):
outbox = True
table_class = tables.OutboxTable
template_name = 'docbow/outbox_list.html'
extra_ctx = {
'view_name': 'outbox',
}
def get_context_data(self):
context = super().get_context_data()
context['trash_url'] = reverse('outbox-trash')
return context
outbox_view = login_required(OutboxView.as_view())
class OutboxTrashView(TrashMailboxView):
outbox = True
table_class = tables.OutboxTrashTable
template_name = 'docbow/outbox_trash_list.html'
extra_ctx = {
'view_name': 'outbox_trash',
}
outbox_trash_view = login_required(OutboxTrashView.as_view())
class SendFileSelectorView(ListView):
template_name = 'docbow/send_file_selector.html'
model = FileType
def get_queryset(self):
qs = super(SendFileSelectorView, self).get_queryset()
limitations = get_filetype_limitation(self.request.user)
if limitations:
return limitations
return qs.filter(is_active=True)
send_file_selector = login_required(SendFileSelectorView.as_view())
delegate = RedirectView.as_view(url='/profile/', permanent=False)
password_change = sensitive_post_parameters()(
never_cache(login_required(as_delegate(profile_views.PasswordChangeView.as_view())))
)
profile = login_required(as_delegate(profile_views.FullProfileView.as_view()))
def search_terms(request, outbox=True, limit_choices=10):
q = request.GET.get('term', '')
documents = sql.get_documents(
Document.objects.all(),
get_related_users(request),
request.user,
outbox,
)
search = watson.search(q, models=(documents,))
choices = []
for match in search:
match_words = [word for word in match.description.split() if word.lower().startswith(q.lower())]
for word in match_words:
if word in choices:
continue
choices.append(word)
if len(choices) == limit_choices:
break
if len(choices) == limit_choices:
break
return JsonResponse(choices, safe=False)
@login_required
def search_inbox(request):
return search_terms(request, outbox=False)
@login_required
def search_outbox(request):
return search_terms(request, outbox=True)