This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
polynum/polynum/base/models/request.py

472 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Request and document
"""
import os.path
import datetime
import decimal
import logging
import re
from django.db import models
from django.utils.translation import ugettext_lazy as _, string_concat
from django.core import validators
from django.template import Template, Context
from django.core.mail import EmailMessage
from django.utils.timezone import now
from django.core.urlresolvers import reverse
from django.conf import settings
from ... import utils
from .. import app_settings
from .. import fields
import profile
from user import User # we use a proxy user model to get ldap attribute retrieval
import workflow
TELEPHONE_VALIDATOR = validators.RegexValidator(regex=r'[0-9. ]*',
message=_(u'Les numéros de téléphone ne peuvent contenir '
u'que des chiffres, des points ou des espaces'))
logger = logging.getLogger(__name__)
class DocumentUsage(models.Model):
class Meta:
app_label = 'base'
verbose_name = _(u"Usage d'un document")
verbose_name_plural = _(u"Usages d'un document")
ordering = ('order', 'name')
def __unicode__(self):
return self.name
name = models.CharField(max_length=64, verbose_name=_(u'Usage'))
description = models.TextField(verbose_name=_(u'Description'), blank=True)
url = models.URLField(max_length=200, verbose_name=_(u'URL'), blank=True)
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
order = models.IntegerField(verbose_name=_(u'Ordre'), default=0)
no_diffusion = models.BooleanField(verbose_name=_(u'Ne pas diffuser'),
help_text=_(u'Si coché, pour les documents ayant cet usage, la diffusion ne sera pas proposée.'),
default=False, blank=True)
class DocumentLicence(models.Model):
class Meta:
app_label = 'base'
verbose_name = _(u"Diffusion d'un document")
verbose_name_plural = _(u"Diffusions d'un document")
ordering = ('order', 'name',)
def __unicode__(self):
return self.name
name = models.CharField(max_length=64, verbose_name=_(u'Droit de diffusion'))
description = models.TextField(verbose_name=_(u'Description'), blank=True)
url = models.URLField(max_length=200, verbose_name=_(u'URL'), blank=True)
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
order = models.IntegerField(verbose_name=_(u'Ordre'), default=0)
default = models.BooleanField(verbose_name=_(u'Valeur par défaut'),
blank=True, default=False)
# Si le document contient des extraits copyrightés, on ne peut pas diffuser le document
only_free_documents = models.BooleanField(verbose_name=_(u'Réservé aux'
u' documents libres de droits'), blank=True, default=False)
# pour ajouter un nouveau type de diffusion, l'ajouter ici
DIFFUSION_TAGS = [
('oai-pmh', _(u'Pousser le document dans le flux OAI-PMH')),
('public-url', _(u'Le document possède une URL publique')),
]
@classmethod
def add_diffusion_tag(cls, tag, description):
for t, d in cls.DIFFUSION_TAGS:
if t == tag:
return
cls.DIFFUSION_TAGS.append((tag, description))
diffusion_tags = fields.MultiSelectField(
verbose_name=_(u"Tags de diffusion"),
help_text=_(u"Ce sont les options diverses que l'ont peut "
u"associer à ce type de diffusion"),
blank=True,
choices=DIFFUSION_TAGS)
class DeliveryPlace(models.Model):
class Meta:
app_label = 'base'
verbose_name = _(u"Lieu de livraison")
verbose_name_plural = _(u"Lieux de livraison")
ordering = ('order', 'name',)
def __unicode__(self):
return self.name
order = models.IntegerField(verbose_name=_(u'Ordre'), default=0)
name = models.CharField(max_length=64, verbose_name=_(u'Lieu de livraison'))
description = models.TextField(verbose_name=_(u'Description'), blank=True)
url = models.URLField(max_length=200, verbose_name=_(u'URL'), blank=True)
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
def get_default_status():
'''Retourne le seul et unique status par défaut, sinon une erreur.'''
return workflow.Status.objects.get(default=True)
def minimum_delivery_timedelta():
return datetime.timedelta(days=app_settings.MINIMUM_DELIVERY_DELAY)
class RequestQueryset(utils.QuerySet):
def total_cost(self):
zero = decimal.Decimal(0)
# special case for Postgres so that it is fast in production
if utils.qs_use_postgres(self):
cost_function1 = '({1}.ppp * {0}.nb_pages + {1}.ppd)*{0}.copies'.format(
Request._meta.db_table, profile.Profile._meta.db_table)
cost_function2 = '''SELECT SUM(({1}.ppp * {0}.nb_pages + {1}.ppd)*{0}.copies) FROM {1}, {2}
WHERE {1}.id = {2}.profileoptionchoice_id AND {2}.request_id = {0}.id'''.format(
Request._meta.db_table, profile.ProfileOptionChoice._meta.db_table,
Request._meta.get_field('choices').rel.through._meta.db_table)
qs1 = self \
.prefetch_related(None) \
.select_related(depth=0) \
.extra(select={
'base_profile_estimated_cost' : cost_function1,
'choices_estimated_cost': cost_function2
})
qs1.query.join((Request._meta.db_table, profile.Profile._meta.db_table,
Request._meta.get_field('base_profile').column,
profile.Profile._meta.get_field('id').column))
return sum([r.cost or ((r.base_profile_estimated_cost or zero)+(r.choices_estimated_cost or zero)) for r in qs1])
else:
# generic implementation working everywhere
qs = self.select_related(depth=0) \
.select_related('base_profile') \
.prefetch_related(None) \
.prefetch_related('choices') \
.only('nb_pages', 'copies', 'base_profile__ppp',
'base_profile__ppd', 'choices__ppp', 'choices__ppd')
total_cost = zero
for r in [ r for r in qs if r.base_profile and (r.get_copies() > 0) ]:
ppp = r.base_profile.ppp
ppd = r.base_profile.ppd
for c in r.choices.all():
ppp += c.ppp
ppd += c.ppd
total_cost += ((r.nb_pages * ppp) + ppd) * r.get_copies()
return total_cost
class Counter(models.Model):
month = models.DateField(unique=True)
counter = models.IntegerField(default=0)
class Meta:
app_label = 'base'
class Request(models.Model):
COPYRIGHT_CHOICES = (
(False, _(
u"Le document ne contient pas d'extrait d'oeuvre soumise"
u" au droit d'auteur")),
(True, _(
u"Le document contient un ou des extrait(s) d'oeuvres(s)"
u"soumise(s) au droit d'auteur")),
)
NUMBER_RE = re.compile(r'(\d\d\d\d)-(\d+)-(\d+)')
FINANCIAL_CODE_RE = re.compile(r'\d{6,}')
class Meta:
app_label = 'base'
verbose_name = _(u'Demande de reprographie');
verbose_name_plural = _(u'Demandes de reprographie')
ordering = ('-creation_date','-month_order')
objects = RequestQueryset.as_manager()
user = models.ForeignKey(User, verbose_name=_(u'Demandeur'))
month_order = models.IntegerField(verbose_name=_(u'Numéro'), default=-1,
unique_for_month='creation_date')
name = models.TextField(verbose_name=_(u'Titre'), blank=True)
uploadfile = models.FileField(upload_to='upload', verbose_name=_(u'Fichier'), blank=True, max_length=512)
nb_pages = models.PositiveIntegerField(verbose_name=_(u'Nombre de pages'), blank=True, null=True, default=0,
help_text=_(u"""Une information exacte sur le nombre de pages et le titre facilitera la tâche du Service de Reprographie."""))
usage = models.ForeignKey(DocumentUsage, verbose_name=_(u'Usage du document'), blank=True, null=True, on_delete=models.PROTECT,
help_text=_(u"""Lusage de votre document sera utile si vous acceptez de lenvoyer à la Bibliothèque (Etape 6)."""))
licence = models.ForeignKey(DocumentLicence, verbose_name=_(u'Droits de diffusion'), blank=True, null=True, on_delete=models.PROTECT)
copyright = models.NullBooleanField(verbose_name=_(u"Déclaration de présence d'extraits protégés"), choices=COPYRIGHT_CHOICES, blank=True, null=True)
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
sponsor = models.CharField(max_length=128, verbose_name=_(u'Commanditaire'), help_text=_(u"Complétez cette information si vous faites cette demande pour un tiers."), blank=True)
entity = models.ForeignKey('Entity', verbose_name=_(u'Destinataires de ce document'), on_delete=models.PROTECT, blank=True, null=True,
help_text=_(u"""Renseigner le diplôme ou l'UE concernée permettra un meilleur suivi de votre document."""))
copies = models.PositiveIntegerField(verbose_name=_(u'Nombre d\'exemplaires'), blank=True, null=True, default=1, validators=[validators.MinValueValidator(1)])
status = models.ForeignKey(workflow.Status, verbose_name=_(u'Status'), on_delete=models.PROTECT)
base_profile = models.ForeignKey('Profile',
verbose_name=_(u'Profil de reprographie'),
help_text = _(u"Choissez le profil de base, puis complétez si "
u"besoin vos choix dans les options ci-dessous."),
blank=True, null=True, on_delete=models.PROTECT,
limit_choices_to={'visible': True})
choices = models.ManyToManyField('ProfileOptionChoice', verbose_name=_(u'Tous les choix de reprographie'), blank=True)
details = models.TextField(verbose_name=_(u'Vos observations ou commentaires'), blank=True)
delivery_date = models.DateField(verbose_name=_(u'Date de livraison souhaitée'),
blank=True, null=True,
help_text=string_concat(_(u'Délai minimum à respecter '), app_settings.MINIMUM_DELIVERY_DELAY, _(' jours.')),
default=lambda:utils.get_next_workable_day(app_settings.MINIMUM_DELIVERY_DELAY))
delivery_place = models.ForeignKey(DeliveryPlace, verbose_name=_(u'Lieu de livraison'), blank=True, null=True, on_delete=models.PROTECT)
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
creation_date = models.DateTimeField(verbose_name=_(u'Date de création'), default=now, editable=False, db_index=True)
modification_date = models.DateTimeField(verbose_name=_(u'Date de modification'), editable=False, db_index=True)
contact_email = models.EmailField(verbose_name=_(u'Email'), blank=True)
contact_telephone1 = models.CharField(max_length=32,
verbose_name=_(u'Téléphone'),
validators=[TELEPHONE_VALIDATOR],
blank=True)
contact_telephone2 = models.CharField(max_length=32,
verbose_name=_(u'Téléphone (bis)'),
validators=[TELEPHONE_VALIDATOR],
blank=True)
contact_bureau = models.CharField(max_length=64,
verbose_name=_('Bureau'),
blank=True)
financial_code = models.CharField(max_length=64,
verbose_name=_(u'Code d\'imputation'), blank=True, db_index=True)
financial_comment = models.TextField(
verbose_name=_(u"Commentaire pour la facturation"), blank=True)
cost = models.DecimalField(verbose_name=_(u'Coût'), blank=True, null=True,
max_digits=10, decimal_places=5)
is_from_remote_request = models.BooleanField(blank=True,
verbose_name=_(u'Demande initiée via un Web-Service'),
default=False)
def copyright_status(self):
if self.copyright is None:
return _(u'Inconnu')
else:
return dict(self.COPYRIGHT_CHOICES).get(self.copyright)
def request_number(self):
s = unicode(self.creation_date.strftime('%Y-%m-'))
s += unicode(self.month_order)
return s.replace(u'-', u'\u2011')
def request_number_csv(self):
s = unicode(self.creation_date.strftime('%Y-%m-'))
s += unicode(self.month_order)
return s
def get_copies(self):
return self.copies or 0
def get_nb_pages(self):
return self.nb_pages or 0
def total_pages(self):
return self.get_copies() * self.get_nb_pages()
def sponsor_display(self):
fn = self.user.get_full_name()
if not fn:
fn = self.user.username
if self.sponsor.strip() and self.sponsor != fn:
return '%s (%s)' % (fn, self.sponsor.strip())
return fn
def sponsor_username(self):
'''Try to extract a LDAP username from the sponsor field'''
m = re.search('\((.*)\)', self.sponsor)
if m:
return m.group(1)
return ''
def estimated_cost(self):
total = decimal.Decimal(0)
if self.base_profile:
total += ((self.base_profile.ppp * self.get_nb_pages()) + self.base_profile.ppd) * self.get_copies()
for choice in self.choices.all():
total += ((choice.ppp * self.get_nb_pages()) + choice.ppd) * self.get_copies()
return total
def missing(self):
null = [None, '', 0]
required = ['name', 'nb_pages', 'usage',
'licence','entity', 'copies', 'base_profile',
'delivery_place', 'delivery_date']
for field in required:
v = getattr(self, field, None)
for n in null:
if v is n:
yield field
if not self.no_diffusion() and self.copyright is None:
yield 'copyright'
def validate(self):
return not bool(list(self.missing()))
def save(self, *args, **kwargs):
self.modification_date = now()
if self.month_order == -1:
date = self.creation_date.date()
date = date.replace(day=1)
counter, created = Counter.objects.get_or_create(month=date)
counter = Counter.objects.select_for_update().get(month=date)
counter.counter += 1
counter.save()
self.month_order = counter.counter
super(Request, self).save(*args, **kwargs)
def act(self, who, workflow, description=None):
from .. import rbac
history = self.history_set.all()
ctx = {
'request': self,
'actor': who,
'comment': description,
'history': history,
'reversed_history': reversed(history),
'request_url': settings.SITE_URL + self.get_absolute_url(),
}
ctx = Context(ctx)
logger.info('action %s par %s avec le commentaire %s sur la demande %s',
unicode(workflow).encode('utf-8'),
unicode(who).encode('utf-8'),
unicode(description).encode('utf-8'),
unicode(self.request_number()).encode('utf-8'))
for mail_notification in workflow.action.mailnotification_set.all():
emails = []
if mail_notification.to == 'requestor':
if self.user.email:
emails = [ self.user.email ]
elif mail_notification.to == 'last_actor':
for history in reversed(history):
if history.old_status != history.new_status and history.user.email:
emails = [ history.user.email ]
break
elif mail_notification.to == 'all_actors':
emails = list(set([h.user.email for h in history]) - set([who.email]))
elif mail_notification.to == 'next_actors':
emails = set(u.email for u in rbac.get_actors(workflow.destination, self.entity))
else:
raise NotImplementedError
# remove empty strings
emails = filter(None, emails)
if not emails:
continue
prefix = '{% load url from future %}'
subject = Template(prefix+mail_notification.subject_template).render(ctx)
body = Template(prefix+mail_notification.body_template).render(ctx)
logger.info("envoi d'un mail à %s",
(', '.join(emails)).encode('utf-8'))
email = EmailMessage(subject=subject, body=body, to=emails)
try:
email.send()
except:
logger.exception("l'envoi du mail a echoue")
History.objects.create(request=self,
user=who,
old_status=self.status,
new_status=workflow.destination,
action=workflow.action,
description=description or '')
self.status = workflow.destination
self.save()
def file_name(self):
if self.uploadfile:
return os.path.basename(self.uploadfile.name)
return ''
def sponsor_name(self):
sponsor_name = re.match(r'[^(]*', self.sponsor).group(0).strip()
if not sponsor_name:
sponsor_name = self.user.display_name()
return sponsor_name
def is_paper(self):
return not bool(self.uploadfile)
def no_diffusion(self):
return self.is_from_remote_request or \
(self.usage and self.usage.no_diffusion) \
or self.is_paper()
def clean(self):
if self.no_diffusion():
self.licence = DocumentLicence(id=1)
def __unicode__(self):
return '<Request id:%s>' % (self.pk,)
def get_absolute_url(self):
return reverse('request_detail', kwargs={'pk': self.pk})
class HistoryManager(models.Manager):
def start(self):
try:
return self.filter(new_status__start=True).latest()
except History.DoesNotExist:
return None
def start_date(self):
start = self.start()
if start:
return start.date
else:
return now()
def end(self):
end = None
for history in self.select_related('new_status').reverse():
if history.new_status.end:
end = history
elif end is not None:
return end
else:
return None
def end_date(self):
end = self.end()
if end is None:
return None
else:
return end.date
def last_show_in_details(self):
for h in self.all().reverse():
if h.action.special_type == 'show_in_details':
return h
return None
history_manager = HistoryManager()
history_manager.use_for_related_fields = True
class History(models.Model):
objects = history_manager
class Meta:
app_label = 'base'
verbose_name = _(u"Historique")
verbose_name_plural = _(u"Historiques des demandes")
ordering = ('date',)
get_latest_by = 'date'
def __unicode__(self):
return self.date.strftime('%Y-%m-%d %H:%M:%S ') + unicode(self.action)
date = models.DateTimeField(auto_now=True, db_index=True)
request = models.ForeignKey(Request, verbose_name=_(u'Demande'))
user = models.ForeignKey(User, verbose_name=_(u'Utilisateur'), blank=True, null=True, on_delete=models.SET_NULL)
action = models.ForeignKey(workflow.Action, verbose_name=_('Action'), on_delete=models.PROTECT)
old_status = models.ForeignKey(workflow.Status,
verbose_name=_(u"État de la demande avant l'action"), blank=True,
related_name='+', null=True, on_delete=models.PROTECT)
new_status = models.ForeignKey(workflow.Status,
verbose_name=_(u"État de la demande après l'action"),
related_name='+', on_delete=models.PROTECT)
description = models.TextField(verbose_name=_(u"Détails éventuels sur l'action"), blank=True)