This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
polynum/polynum/request/views.py

456 lines
19 KiB
Python

# -*- coding: utf-8 -*-
import datetime
import os.path
import json
import csv
from django.core.urlresolvers import reverse
from django.conf import settings
from django.shortcuts import redirect, get_object_or_404, render
from django.contrib.formtools.wizard.views import NamedUrlSessionWizardView
from django.core.files.storage import FileSystemStorage
from django.contrib.auth.decorators import login_required
from django.views.generic.detail import SingleObjectMixin, DetailView
from django.views.generic.list import ListView
from django.db import transaction
from django.db.models import Q
from django.contrib.formtools.wizard.storage.session import SessionStorage
from django.utils.translation import ugettext_lazy as _
from django.http import Http404, HttpResponse, HttpResponseRedirect
from django.utils.http import urlencode
from django.contrib import messages
from .forms import DocumentUploadForm, DocumentDetailsForm, ReproDetailsForm, \
DeliveryForm, CopyrigtsForm, ValidationForm, ReproOriginForm, \
FinancialForm, CostForm
from ..base.models import Request, History, Transition, Entity, Status
from ..base.models.request import get_default_status
from ..base.rbac import get_workable_requests, get_workflows
from .. import app_settings
from utils import field_completion
named_new_request_forms = (
('document_upload', DocumentUploadForm),
('document_details', DocumentDetailsForm),
('repro_origin', ReproOriginForm),
('reprography', ReproDetailsForm),
('delivery', DeliveryForm),
('document_copyrights', CopyrigtsForm),
('financial_information', FinancialForm),
('real_cost', CostForm),
('validation', ValidationForm),
)
class DummyStorage(SessionStorage):
def get_step_files(self, step):
return None
class RequestWizardView(NamedUrlSessionWizardView, SingleObjectMixin):
model = Request
template_name = 'new_request.html'
file_storage = FileSystemStorage(location = settings.DOCUMENTS_UPLOAD_DIRECTORY)
storage_name = 'polynum.request.views.DummyStorage'
def get_template_names(self):
return ['%s.%s' % (self.steps.current, self.template_name), self.template_name]
def done(self, form_list, **kwargs):
ctx = self.get_context_data(form_list[0], **kwargs)
if ctx.get('validated'):
instance = self.get_object()
instance.status = get_default_status()
instance.save()
return redirect('list_request')
for step, form, missings in ctx['all_forms']:
if missings:
return redirect(self.get_step_url(step=step))
raise NotImplemented
def get_form_initial(self, step):
initial = super(RequestWizardView, self).get_form_initial(step=step) or {}
initial['delivery_date'] = datetime.date.today()+datetime.timedelta(3)
initial['contact_email'] = getattr(self.request.user, 'mail', '')
initial['contact_telephone1'] = getattr(self.request.user, 'telephoneNumber', '')
initial['contact_telephone2'] = getattr(self.request.user, 'supannAutreTelephone', '')
initial['contact_bureau'] = getattr(self.request.user, 'roomNumber', '')
initial['sponsor'] = '%(first_name)s %(last_name)s' % self.request.user._wrapped.__dict__
return initial
def get_context_data(self, form, **kwargs):
'''
Add all_forms = list of (step, form, pprint_data), where pprint_data is
a list of (name, value) created by each form, representing a "nice"
view of the stored datas
'''
context = super(RequestWizardView, self).get_context_data(form=form, **kwargs)
# cleaned_data = self.get_all_cleaned_data() or {}
all_forms = []
validated = True
for step, form in self.get_form_list().items():
form_instance = self.get_form(step=step)
pprint_data = form_instance.pprint_data({})
if pprint_data:
missings = [ l[0] for l in pprint_data if len(l) > 2 and l[2]]
else:
missings = []
validated = validated and not bool(missings)
all_forms += [ (step, form, pprint_data, missings) ]
context['all_forms'] = all_forms
context['validated'] = validated
context['object'] = self.get_object()
context['workflows'] = get_workflows(self.request.user, context['object']) \
.exclude(action__special_type='edit').filter(action__validate_request__lte=validated).order_by('-warn', 'default')
context['roots'] = Entity.objects.filter(parent_relations__isnull=True).values_list('code', 'description')
return context
def get_prefix(self, *args, **kwargs):
prefix = super(RequestWizardView, self).get_prefix(*args, **kwargs)
prefix += '_%s' % kwargs.get('pk', '')
return prefix
def get(self, request, *args, **kwargs):
return super(RequestWizardView, self).get(request, *args, **kwargs)
def get_form_instance(self, step):
return self.get_object()
def get_step_url(self, step):
return reverse(self.url_name, kwargs={self.pk_url_kwarg: self.get_object().pk, 'step': step})
def render_next_step(self, form, **kwargs):
form.save()
# special case when coming from the validation page
if 'correction' in self.request.GET:
return redirect(self.get_step_url('validation'))
pprint_data = form.pprint_data({})
# do not apply when going to the validation page, that's useless
if any([x[2] for x in pprint_data]) and self.steps.next != 'validation':
field_names = [x[0] for x in pprint_data if x[2]]
messages.warning(self.request,
_(u"Vous n'avez pas remplis les champs obligatoires suivants: "
u"%s. Vous pouvez tout de même continuer à remplir votre demande, "
u"et aussi la sauver comme brouillon. Mais vous ne pourrez pas "
u"la soumettre tant que ces champs ne seront pas remplis")
% ', '.join(map(unicode, field_names)))
return super(RequestWizardView, self).render_next_step(form, **kwargs)
def dispatch(self, request, *args, **kwargs):
self.transition = None
return super(RequestWizardView, self).dispatch(request, *args, **kwargs)
def get_form_list(self):
if self.transition is None:
self.transition = get_workflows(self.request.user, self.get_object()) \
.select_related().get(action__special_type='edit')
pages = self.transition.action.edit_pages()
for form_name in self.get_form_list():
if form_name not in pages:
self.condition_dict[form_name] = False
self.condition_dict['validation'] = True
return super(RequestWizardView, self).get_form_list()
class RequestDetails(DetailView):
model = Request
template_name = 'request_detail.html'
context_object_name = 'poly_request'
queryset = Request.objects.select_related().prefetch_related('choices', 'base_profile__choices')
def get_context_data(self, **kwargs):
ctx = super(RequestDetails, self).get_context_data(**kwargs)
request = self.get_object()
ctx['docname'] = request.name
if request.uploadfile:
ctx['filename'] = os.path.basename(request.uploadfile.name)
if not ctx['docname']:
ctx['docname'] = ctx['filename']
ctx['creation'] = request.history_set.start()
ctx['end'] = request.history_set.end()
ctx['workflows'] = get_workflows(self.request.user, request) \
.filter(action__validate_request__lte=request.validate()) \
.order_by('-warn', 'default')
return ctx
class RequestHistory(DetailView):
model = Request
template_name = 'request_history.html'
context_object_name = 'poly_request'
@login_required
@transaction.commit_on_success
def new_request(request):
request = Request.objects.create(user=request.user)
return redirect('request_wizard', pk=request.pk)
@transaction.commit_on_success
def request_delete(request, pk):
instance = get_object_or_404(Request, pk=pk)
instance.delete()
return redirect('list_request')
request_wizard_step = login_required(RequestWizardView.as_view(named_new_request_forms,
done_step_name='finished', url_name='request_wizard_step'))
def upload(request, attached_file):
response = HttpResponse(attached_file.chunks(), mimetype='application/octet-stream')
response['Content-disposition'] = 'attachment'
return response
def request_download(request, pk):
instance = get_object_or_404(Request, pk=pk)
if instance.uploadfile is None:
raise Http404
return upload(request, instance.uploadfile)
@transaction.commit_on_success
def request_action(request, pk, workflow_pk):
poly_request = get_object_or_404(Request, pk=pk)
try:
workflow = get_workflows(request.user, poly_request).get(pk=workflow_pk)
except Transition.DoesNotExist:
raise Http404
if workflow.comment and not request.POST.get('comment'):
return render(request, 'request_action.html', locals())
poly_request.act(request.user, workflow, description=request.POST.get('comment'))
if workflow.edit:
return redirect('request_wizard', pk=pk)
elif workflow.delete:
return redirect('request_delete', pk=pk)
elif workflow.duplicate:
return redirect('request_duplicate', pk=pk)
return redirect('request_detail', pk=pk)
@transaction.commit_on_success
def request_duplicate(request, pk):
poly_request = get_object_or_404(Request, pk=pk)
new_poly_request = get_object_or_404(Request, pk=pk)
new_poly_request.pk = None
new_poly_request.status = get_default_status()
new_poly_request.save()
new_poly_request.choices = poly_request.choices.all()
return redirect('request_wizard', pk=new_poly_request.pk)
class ListRequest(ListView):
template_name = 'list_request.html'
model = Request
paginate_by = app_settings.PAGINATE_BY
context_object_name = 'requests'
def dispatch(self, request, *args, **kwargs):
self.request = request
self.filters = []
self.filter_choices = []
self.sort = request.GET.get('sort')
self.handle_filter_status()
self.handle_search_filter()
self.handle_entity_filter()
self.handle_date_filter()
return super(ListRequest, self).dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
if 'filter_csv' in request.GET:
response = HttpResponse(content_type="text/csv")
response['Content-disposition'] = 'filename=listing.csv'
writer = csv.writer(response)
fields = ['user__username', 'name', 'uploadfile', 'nb_pages', 'usage__name', 'licence__name', 'copyright',
'comments', 'sponsor', 'entity__description', 'copies',
'status__name', 'base_profile__name', 'details',
'delivery_date', 'delivery_place__name', 'comments',
'creation_date', 'modification_date', 'contact_email',
'contact_telephone1', 'contact_telephone2', 'contact_bureau',
'financial_code', 'financial_comment', 'cost' ]
writer.writerow(fields)
for values in self.get_queryset().values_list(*fields):
values = map(lambda s: unicode(s).encode('utf-8'), values)
writer.writerow(values)
return response
return super(ListRequest, self).get(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
params = {}
if 'from_date' in request.POST:
params['from_date'] = request.POST['from_date']
if 'to_date' in request.POST:
params['to_date'] = request.POST['to_date']
return HttpResponseRedirect(self.qs(**params))
def get_queryset(self, without_filter=()):
qs = super(ListRequest, self).get_queryset()
qs = get_workable_requests(self.request.user)
# apply filters
qs = qs.filter(status__visible=True)
for filter_name, value, display_list in self.filters:
if filter_name in without_filter:
continue
apply_filter = 'apply_filter_%s' % filter_name
if hasattr(self, apply_filter):
qs = getattr(self, apply_filter)(qs, value)
else:
qs = qs.filter(**{filter_name: value})
# apply ordering
if self.sort:
qs = qs.order_by(self.sort)
return qs
def handle_filter_status(self):
current = self.request.GET.get('filter_status')
choices = []
if current:
display = Status.objects.get(id=current).name
self.filters.append(('status', current, [(display, self.qs(**{'status': None}))]))
choices = self.get_queryset(without_filter=['status']).exclude(status=current).values_list('status', 'status__name').distinct()
choices = [
(self.qs(**{'status': status}), display)
for status, display in choices
]
if choices:
self.filter_choices.append({
'caption': _(u'Sur le statut'),
'choices': choices })
def handle_search_filter(self):
current = self.request.GET.get('filter_search')
if current:
self.filters.append(('search', current, [_(u'Contient le texte %s') % current,
self.qs(**{'search': None})]))
def apply_filter_search(sefl, qs, value):
return qs.filter(Q(uploadfile__contains=value)
| Q(name__contains=value)
| Q(user__username__contains=value))
def handle_entity_filter(self, min_depth=2, max_depth=4):
value = self.request.GET.get('filter_entity')
qs = Entity.objects \
.filter(depth__lte=max_depth, depth__gte=min_depth) \
.filter(Q(children_relations__child__request__isnull=False)|
Q(request__isnull=False)) \
.filter(Q(parent_relations__parent__roleassociation__group__user=self.request.user)|
Q(roleassociation__group__user=self.request.user)) \
.order_by('name')
by_code = dict([(entity.code, entity) for entity in qs])
current = None
path = []
if value:
path = value.split(',')
path_entities = []
for entity_code in path:
entity = by_code.get(entity_code)
if not entity:
return
path_entities.append(entity)
current = path_entities[-1]
display_list = [ (_(u'Composante %s') % path_entities[0].name,
self.qs(**{'entity': None}))]
delete_value = path[0]
for entity in path_entities[1:]:
display_list.append((entity.get_name(),
self.qs(**{'entity': delete_value})))
delete_value = '%s,%s' % (delete_value, entity.code)
self.filters.append(('entity', path[-1], display_list))
parent = None
if len(path) > 1:
parent = by_code.get(path[-2])
if current:
if current.depth == max_depth or len(current.children_relations.all()) == 0:
depth = current.depth
path = path[:-1]
else:
depth = current.depth+1
else:
depth = min_depth
path = []
# filter on depth
choices = [entity for entity in by_code.itervalues() if entity.depth==depth]
# filter on parenting
if parent:
choices = [entity for entity in choices if parent in [relation.parent for relation in entity.parent_relations.all()]]
# create filter choices
choices = [(self.qs(**{'entity': ','.join(path+[entity.code])}),
entity.get_name()) for entity in choices if entity != current]
if choices:
self.filter_choices.append({
'caption': _('Sur le service ou la composante'),
'choices': choices,
})
def apply_filter_entity(self, qs, value):
print 'value', value
return qs.filter(Q(entity__parent_relations__parent__code=value)|
Q(entity__code=value))
def get_filters_field(self, field, caption):
result = {}
result['caption'] = caption
current = self.filters.get(field)
values = self.get_queryset().values_list(field, flat=True).distinct()
if current:
choices = [({field: None}, _('< Tous')), (None, values[0])]
else:
if not values:
return []
choices = [({field: a}, a) for a in values]
choices = [ (self.qs(**a) if a else a, b) for a, b in choices ]
result['choices'] = choices
return [result]
def qs(self, **kwargs):
params = dict()
for k in self.request.GET:
params[k] = self.request.GET[k]
for k,v in kwargs.iteritems():
k = 'filter_%s' % k
if v is None:
params.pop(k, None)
else:
params[k] = v
return '?%s' % urlencode(params)
def handle_date_filter(self):
from_date = self.request.GET.get('filter_from_date')
to_date = self.request.GET.get('filter_to_date')
if from_date:
try:
from_date = datetime.datetime.strptime(from_date, '%Y-%m-%d').date()
except ValueError:
from_date = None
if to_date:
try:
to_date = datetime.datetime.strptime(to_date, '%Y-%m-%d').date()
except ValueError:
to_date = None
display_list = []
if from_date:
display_list.append(
(_(u'À livrer après le %s') % from_date, self.qs(**{'from_date': None})))
if to_date:
display_list.append(
(_(u'À livrer avant le %s') % to_date, self.qs(**{'to_date': None})))
if display_list:
self.filters.append(('date', (from_date, to_date), display_list))
def apply_filter_date(self, qs, value):
from_date, to_date = value
if from_date:
qs = qs.filter(delivery_date__gte=from_date)
if to_date:
qs = qs.filter(delivery_date__lte=to_date)
return qs
def get_context_data(self, **kwargs):
ctx = super(ListRequest, self).get_context_data(**kwargs)
ctx['sort'] = self.sort
ctx['filters'] = self.filters
ctx['filter_choices'] = self.filter_choices
ctx['total_cost'] = sum((request.estimated_cost() for request in self.get_queryset()))
ctx['qs'] = self.qs()
ctx['now'] = datetime.date.today().strftime('%Y-%m-%d')
ctx['csv_qs'] = self.qs(**{'csv': 1})
return ctx
def autocomplete(request, field_names=('name',)):
completion_data = field_completion(request.user, field_names, without_scores='without_scores' in request.GET)
response = HttpResponse(json.dumps(completion_data), mimetype='application/javascript')
return response