472 lines
20 KiB
Python
472 lines
20 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
Request and document
|
||
"""
|
||
|
||
import os.path
|
||
import datetime
|
||
import decimal
|
||
import logging
|
||
import re
|
||
|
||
from django.db import models
|
||
from django.utils.translation import ugettext_lazy as _, string_concat
|
||
from django.core import validators
|
||
from django.template import Template, Context
|
||
from django.core.mail import EmailMessage
|
||
from django.utils.timezone import now
|
||
from django.core.urlresolvers import reverse
|
||
from django.conf import settings
|
||
|
||
from ... import utils
|
||
from .. import app_settings
|
||
from .. import fields
|
||
|
||
import profile
|
||
from user import User # we use a proxy user model to get ldap attribute retrieval
|
||
import workflow
|
||
|
||
TELEPHONE_VALIDATOR = validators.RegexValidator(regex=r'[0-9. ]*',
|
||
message=_(u'Les numéros de téléphone ne peuvent contenir '
|
||
u'que des chiffres, des points ou des espaces'))
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class DocumentUsage(models.Model):
|
||
class Meta:
|
||
app_label = 'base'
|
||
verbose_name = _(u"Usage d'un document")
|
||
verbose_name_plural = _(u"Usages d'un document")
|
||
ordering = ('order', 'name')
|
||
|
||
def __unicode__(self):
|
||
return self.name
|
||
|
||
name = models.CharField(max_length=64, verbose_name=_(u'Usage'))
|
||
description = models.TextField(verbose_name=_(u'Description'), blank=True)
|
||
url = models.URLField(max_length=200, verbose_name=_(u'URL'), blank=True)
|
||
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
|
||
order = models.IntegerField(verbose_name=_(u'Ordre'), default=0)
|
||
no_diffusion = models.BooleanField(verbose_name=_(u'Ne pas diffuser'),
|
||
help_text=_(u'Si coché, pour les documents ayant cet usage, la diffusion ne sera pas proposée.'),
|
||
default=False, blank=True)
|
||
|
||
class DocumentLicence(models.Model):
|
||
|
||
class Meta:
|
||
app_label = 'base'
|
||
verbose_name = _(u"Diffusion d'un document")
|
||
verbose_name_plural = _(u"Diffusions d'un document")
|
||
ordering = ('order', 'name',)
|
||
|
||
def __unicode__(self):
|
||
return self.name
|
||
|
||
name = models.CharField(max_length=64, verbose_name=_(u'Droit de diffusion'))
|
||
description = models.TextField(verbose_name=_(u'Description'), blank=True)
|
||
url = models.URLField(max_length=200, verbose_name=_(u'URL'), blank=True)
|
||
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
|
||
order = models.IntegerField(verbose_name=_(u'Ordre'), default=0)
|
||
default = models.BooleanField(verbose_name=_(u'Valeur par défaut'),
|
||
blank=True, default=False)
|
||
# Si le document contient des extraits copyrightés, on ne peut pas diffuser le document
|
||
only_free_documents = models.BooleanField(verbose_name=_(u'Réservé aux'
|
||
u' documents libres de droits'), blank=True, default=False)
|
||
|
||
# pour ajouter un nouveau type de diffusion, l'ajouter ici
|
||
DIFFUSION_TAGS = [
|
||
('oai-pmh', _(u'Pousser le document dans le flux OAI-PMH')),
|
||
('public-url', _(u'Le document possède une URL publique')),
|
||
]
|
||
|
||
@classmethod
|
||
def add_diffusion_tag(cls, tag, description):
|
||
for t, d in cls.DIFFUSION_TAGS:
|
||
if t == tag:
|
||
return
|
||
cls.DIFFUSION_TAGS.append((tag, description))
|
||
|
||
diffusion_tags = fields.MultiSelectField(
|
||
verbose_name=_(u"Tags de diffusion"),
|
||
help_text=_(u"Ce sont les options diverses que l'ont peut "
|
||
u"associer à ce type de diffusion"),
|
||
blank=True,
|
||
choices=DIFFUSION_TAGS)
|
||
|
||
|
||
class DeliveryPlace(models.Model):
|
||
class Meta:
|
||
app_label = 'base'
|
||
verbose_name = _(u"Lieu de livraison")
|
||
verbose_name_plural = _(u"Lieux de livraison")
|
||
ordering = ('order', 'name',)
|
||
|
||
def __unicode__(self):
|
||
return self.name
|
||
|
||
order = models.IntegerField(verbose_name=_(u'Ordre'), default=0)
|
||
name = models.CharField(max_length=64, verbose_name=_(u'Lieu de livraison'))
|
||
description = models.TextField(verbose_name=_(u'Description'), blank=True)
|
||
url = models.URLField(max_length=200, verbose_name=_(u'URL'), blank=True)
|
||
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
|
||
|
||
def get_default_status():
|
||
'''Retourne le seul et unique status par défaut, sinon une erreur.'''
|
||
return workflow.Status.objects.get(default=True)
|
||
|
||
def minimum_delivery_timedelta():
|
||
return datetime.timedelta(days=app_settings.MINIMUM_DELIVERY_DELAY)
|
||
|
||
class RequestQueryset(utils.QuerySet):
|
||
def total_cost(self):
|
||
zero = decimal.Decimal(0)
|
||
# special case for Postgres so that it is fast in production
|
||
if utils.qs_use_postgres(self):
|
||
cost_function1 = '({1}.ppp * {0}.nb_pages + {1}.ppd)*{0}.copies'.format(
|
||
Request._meta.db_table, profile.Profile._meta.db_table)
|
||
cost_function2 = '''SELECT SUM(({1}.ppp * {0}.nb_pages + {1}.ppd)*{0}.copies) FROM {1}, {2}
|
||
WHERE {1}.id = {2}.profileoptionchoice_id AND {2}.request_id = {0}.id'''.format(
|
||
Request._meta.db_table, profile.ProfileOptionChoice._meta.db_table,
|
||
Request._meta.get_field('choices').rel.through._meta.db_table)
|
||
|
||
qs1 = self \
|
||
.prefetch_related(None) \
|
||
.select_related(depth=0) \
|
||
.extra(select={
|
||
'base_profile_estimated_cost' : cost_function1,
|
||
'choices_estimated_cost': cost_function2
|
||
})
|
||
qs1.query.join((Request._meta.db_table, profile.Profile._meta.db_table,
|
||
Request._meta.get_field('base_profile').column,
|
||
profile.Profile._meta.get_field('id').column))
|
||
return sum([r.cost or ((r.base_profile_estimated_cost or zero)+(r.choices_estimated_cost or zero)) for r in qs1])
|
||
else:
|
||
# generic implementation working everywhere
|
||
qs = self.select_related(depth=0) \
|
||
.select_related('base_profile') \
|
||
.prefetch_related(None) \
|
||
.prefetch_related('choices') \
|
||
.only('nb_pages', 'copies', 'base_profile__ppp',
|
||
'base_profile__ppd', 'choices__ppp', 'choices__ppd')
|
||
total_cost = zero
|
||
for r in [ r for r in qs if r.base_profile and (r.get_copies() > 0) ]:
|
||
ppp = r.base_profile.ppp
|
||
ppd = r.base_profile.ppd
|
||
for c in r.choices.all():
|
||
ppp += c.ppp
|
||
ppd += c.ppd
|
||
total_cost += ((r.nb_pages * ppp) + ppd) * r.get_copies()
|
||
return total_cost
|
||
|
||
class Counter(models.Model):
|
||
month = models.DateField(unique=True)
|
||
counter = models.IntegerField(default=0)
|
||
|
||
class Meta:
|
||
app_label = 'base'
|
||
|
||
class Request(models.Model):
|
||
COPYRIGHT_CHOICES = (
|
||
(False, _(
|
||
u"Le document ne contient pas d'extrait d'oeuvre soumise"
|
||
u" au droit d'auteur")),
|
||
(True, _(
|
||
u"Le document contient un ou des extrait(s) d'oeuvres(s)"
|
||
u"soumise(s) au droit d'auteur")),
|
||
)
|
||
|
||
NUMBER_RE = re.compile(r'(\d\d\d\d)-(\d+)-(\d+)')
|
||
FINANCIAL_CODE_RE = re.compile(r'\d{6,}')
|
||
|
||
class Meta:
|
||
app_label = 'base'
|
||
verbose_name = _(u'Demande de reprographie');
|
||
verbose_name_plural = _(u'Demandes de reprographie')
|
||
ordering = ('-creation_date','-month_order')
|
||
|
||
objects = RequestQueryset.as_manager()
|
||
|
||
user = models.ForeignKey(User, verbose_name=_(u'Demandeur'))
|
||
month_order = models.IntegerField(verbose_name=_(u'Numéro'), default=-1,
|
||
unique_for_month='creation_date')
|
||
name = models.TextField(verbose_name=_(u'Titre'), blank=True)
|
||
uploadfile = models.FileField(upload_to='upload', verbose_name=_(u'Fichier'), blank=True, max_length=512)
|
||
nb_pages = models.PositiveIntegerField(verbose_name=_(u'Nombre de pages'), blank=True, null=True, default=0,
|
||
help_text=_(u"""Une information exacte sur le nombre de pages et le titre facilitera la tâche du Service de Reprographie."""))
|
||
usage = models.ForeignKey(DocumentUsage, verbose_name=_(u'Usage du document'), blank=True, null=True, on_delete=models.PROTECT,
|
||
help_text=_(u"""L’usage de votre document sera utile si vous acceptez de l’envoyer à la Bibliothèque (Etape 6)."""))
|
||
licence = models.ForeignKey(DocumentLicence, verbose_name=_(u'Droits de diffusion'), blank=True, null=True, on_delete=models.PROTECT)
|
||
copyright = models.NullBooleanField(verbose_name=_(u"Déclaration de présence d'extraits protégés"), choices=COPYRIGHT_CHOICES, blank=True, null=True)
|
||
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
|
||
sponsor = models.CharField(max_length=128, verbose_name=_(u'Commanditaire'), help_text=_(u"Complétez cette information si vous faites cette demande pour un tiers."), blank=True)
|
||
entity = models.ForeignKey('Entity', verbose_name=_(u'Destinataires de ce document'), on_delete=models.PROTECT, blank=True, null=True,
|
||
help_text=_(u"""Renseigner le diplôme ou l'UE concernée permettra un meilleur suivi de votre document."""))
|
||
copies = models.PositiveIntegerField(verbose_name=_(u'Nombre d\'exemplaires'), blank=True, null=True, default=1, validators=[validators.MinValueValidator(1)])
|
||
status = models.ForeignKey(workflow.Status, verbose_name=_(u'Status'), on_delete=models.PROTECT)
|
||
base_profile = models.ForeignKey('Profile',
|
||
verbose_name=_(u'Profil de reprographie'),
|
||
help_text = _(u"Choissez le profil de base, puis complétez si "
|
||
u"besoin vos choix dans les options ci-dessous."),
|
||
blank=True, null=True, on_delete=models.PROTECT,
|
||
limit_choices_to={'visible': True})
|
||
choices = models.ManyToManyField('ProfileOptionChoice', verbose_name=_(u'Tous les choix de reprographie'), blank=True)
|
||
details = models.TextField(verbose_name=_(u'Vos observations ou commentaires'), blank=True)
|
||
delivery_date = models.DateField(verbose_name=_(u'Date de livraison souhaitée'),
|
||
blank=True, null=True,
|
||
help_text=string_concat(_(u'Délai minimum à respecter '), app_settings.MINIMUM_DELIVERY_DELAY, _(' jours.')),
|
||
default=lambda:utils.get_next_workable_day(app_settings.MINIMUM_DELIVERY_DELAY))
|
||
delivery_place = models.ForeignKey(DeliveryPlace, verbose_name=_(u'Lieu de livraison'), blank=True, null=True, on_delete=models.PROTECT)
|
||
comments = models.TextField(verbose_name=_(u'Remarques et commentaires'), blank=True)
|
||
creation_date = models.DateTimeField(verbose_name=_(u'Date de création'), default=now, editable=False, db_index=True)
|
||
modification_date = models.DateTimeField(verbose_name=_(u'Date de modification'), editable=False, db_index=True)
|
||
contact_email = models.EmailField(verbose_name=_(u'Email'), blank=True)
|
||
contact_telephone1 = models.CharField(max_length=32,
|
||
verbose_name=_(u'Téléphone'),
|
||
validators=[TELEPHONE_VALIDATOR],
|
||
blank=True)
|
||
contact_telephone2 = models.CharField(max_length=32,
|
||
verbose_name=_(u'Téléphone (bis)'),
|
||
validators=[TELEPHONE_VALIDATOR],
|
||
blank=True)
|
||
contact_bureau = models.CharField(max_length=64,
|
||
verbose_name=_('Bureau'),
|
||
blank=True)
|
||
financial_code = models.CharField(max_length=64,
|
||
verbose_name=_(u"Engagement provisionnel"), blank=True, db_index=True)
|
||
financial_comment = models.TextField(
|
||
verbose_name=_(u"Commentaire pour la facturation"), blank=True)
|
||
cost = models.DecimalField(verbose_name=_(u'Coût'), blank=True, null=True,
|
||
max_digits=7, decimal_places=5)
|
||
is_from_remote_request = models.BooleanField(blank=True,
|
||
verbose_name=_(u'Demande initiée via un Web-Service'),
|
||
default=False)
|
||
|
||
def copyright_status(self):
|
||
if self.copyright is None:
|
||
return _(u'Inconnu')
|
||
else:
|
||
return dict(self.COPYRIGHT_CHOICES).get(self.copyright)
|
||
|
||
def request_number(self):
|
||
s = unicode(self.creation_date.strftime('%Y-%m-'))
|
||
s += unicode(self.month_order)
|
||
return s.replace(u'-', u'\u2011')
|
||
|
||
def request_number_csv(self):
|
||
s = unicode(self.creation_date.strftime('%Y-%m-'))
|
||
s += unicode(self.month_order)
|
||
return s
|
||
|
||
def get_copies(self):
|
||
return self.copies or 0
|
||
|
||
def get_nb_pages(self):
|
||
return self.nb_pages or 0
|
||
|
||
def total_pages(self):
|
||
return self.get_copies() * self.get_nb_pages()
|
||
|
||
def sponsor_display(self):
|
||
fn = self.user.get_full_name()
|
||
if not fn:
|
||
fn = self.user.username
|
||
if self.sponsor.strip() and self.sponsor != fn:
|
||
return '%s (%s)' % (fn, self.sponsor.strip())
|
||
return fn
|
||
|
||
def sponsor_username(self):
|
||
'''Try to extract a LDAP username from the sponsor field'''
|
||
m = re.search('\((.*)\)', self.sponsor)
|
||
if m:
|
||
return m.group(1)
|
||
return ''
|
||
|
||
def estimated_cost(self):
|
||
total = decimal.Decimal(0)
|
||
if self.base_profile:
|
||
total += ((self.base_profile.ppp * self.get_nb_pages()) + self.base_profile.ppd) * self.get_copies()
|
||
for choice in self.choices.all():
|
||
total += ((choice.ppp * self.get_nb_pages()) + choice.ppd) * self.get_copies()
|
||
return total
|
||
|
||
def missing(self):
|
||
null = [None, '', 0]
|
||
required = ['name', 'nb_pages', 'usage',
|
||
'licence','entity', 'copies', 'base_profile',
|
||
'delivery_place', 'delivery_date']
|
||
for field in required:
|
||
v = getattr(self, field, None)
|
||
for n in null:
|
||
if v is n:
|
||
yield field
|
||
if not self.no_diffusion() and self.copyright is None:
|
||
yield 'copyright'
|
||
|
||
def validate(self):
|
||
return not bool(list(self.missing()))
|
||
|
||
def save(self, *args, **kwargs):
|
||
self.modification_date = now()
|
||
if self.month_order == -1:
|
||
date = self.creation_date.date()
|
||
date = date.replace(day=1)
|
||
counter, created = Counter.objects.get_or_create(month=date)
|
||
counter = Counter.objects.select_for_update().get(month=date)
|
||
counter.counter += 1
|
||
counter.save()
|
||
self.month_order = counter.counter
|
||
super(Request, self).save(*args, **kwargs)
|
||
|
||
def act(self, who, workflow, description=None):
|
||
from .. import rbac
|
||
history = self.history_set.all()
|
||
ctx = {
|
||
'request': self,
|
||
'actor': who,
|
||
'comment': description,
|
||
'history': history,
|
||
'reversed_history': reversed(history),
|
||
'request_url': settings.SITE_URL + self.get_absolute_url(),
|
||
}
|
||
ctx = Context(ctx)
|
||
logger.info('action %s par %s avec le commentaire %s sur la demande %s',
|
||
unicode(workflow).encode('utf-8'),
|
||
unicode(who).encode('utf-8'),
|
||
unicode(description).encode('utf-8'),
|
||
unicode(self.request_number()).encode('utf-8'))
|
||
for mail_notification in workflow.action.mailnotification_set.all():
|
||
emails = []
|
||
if mail_notification.to == 'requestor':
|
||
if self.user.email:
|
||
emails = [ self.user.email ]
|
||
elif mail_notification.to == 'last_actor':
|
||
for history in reversed(history):
|
||
if history.old_status != history.new_status and history.user.email:
|
||
emails = [ history.user.email ]
|
||
break
|
||
elif mail_notification.to == 'all_actors':
|
||
emails = list(set([h.user.email for h in history]) - set([who.email]))
|
||
elif mail_notification.to == 'next_actors':
|
||
emails = set(u.email for u in rbac.get_actors(workflow.destination, self.entity))
|
||
else:
|
||
raise NotImplementedError
|
||
# remove empty strings
|
||
emails = filter(None, emails)
|
||
if not emails:
|
||
continue
|
||
prefix = '{% load url from future %}'
|
||
subject = Template(prefix+mail_notification.subject_template).render(ctx)
|
||
body = Template(prefix+mail_notification.body_template).render(ctx)
|
||
logger.info("envoi d'un mail à %s",
|
||
(', '.join(emails)).encode('utf-8'))
|
||
email = EmailMessage(subject=subject, body=body, to=emails)
|
||
try:
|
||
email.send()
|
||
except:
|
||
logger.exception("l'envoi du mail a echoue")
|
||
|
||
History.objects.create(request=self,
|
||
user=who,
|
||
old_status=self.status,
|
||
new_status=workflow.destination,
|
||
action=workflow.action,
|
||
description=description or '')
|
||
|
||
self.status = workflow.destination
|
||
self.save()
|
||
|
||
def file_name(self):
|
||
if self.uploadfile:
|
||
return os.path.basename(self.uploadfile.name)
|
||
return ''
|
||
|
||
def sponsor_name(self):
|
||
sponsor_name = re.match(r'[^(]*', self.sponsor).group(0).strip()
|
||
if not sponsor_name:
|
||
sponsor_name = self.user.display_name()
|
||
return sponsor_name
|
||
|
||
def is_paper(self):
|
||
return not bool(self.uploadfile)
|
||
|
||
def no_diffusion(self):
|
||
return self.is_from_remote_request or \
|
||
(self.usage and self.usage.no_diffusion) \
|
||
or self.is_paper()
|
||
|
||
def clean(self):
|
||
if self.no_diffusion():
|
||
self.licence = DocumentLicence(id=1)
|
||
|
||
def __unicode__(self):
|
||
return '<Request id:%s>' % (self.pk,)
|
||
|
||
def get_absolute_url(self):
|
||
return reverse('request_detail', kwargs={'pk': self.pk})
|
||
|
||
class HistoryManager(models.Manager):
|
||
def start(self):
|
||
try:
|
||
return self.filter(new_status__start=True).latest()
|
||
except History.DoesNotExist:
|
||
return None
|
||
|
||
def start_date(self):
|
||
start = self.start()
|
||
if start:
|
||
return start.date
|
||
else:
|
||
return now()
|
||
|
||
def end(self):
|
||
end = None
|
||
for history in self.select_related('new_status').reverse():
|
||
if history.new_status.end:
|
||
end = history
|
||
elif end is not None:
|
||
return end
|
||
else:
|
||
return None
|
||
|
||
def end_date(self):
|
||
end = self.end()
|
||
if end is None:
|
||
return None
|
||
else:
|
||
return end.date
|
||
|
||
def last_show_in_details(self):
|
||
for h in self.all().reverse():
|
||
if h.action.special_type == 'show_in_details':
|
||
return h
|
||
return None
|
||
|
||
history_manager = HistoryManager()
|
||
history_manager.use_for_related_fields = True
|
||
|
||
class History(models.Model):
|
||
objects = history_manager
|
||
|
||
class Meta:
|
||
app_label = 'base'
|
||
verbose_name = _(u"Historique")
|
||
verbose_name_plural = _(u"Historiques des demandes")
|
||
ordering = ('date',)
|
||
get_latest_by = 'date'
|
||
|
||
def __unicode__(self):
|
||
return self.date.strftime('%Y-%m-%d %H:%M:%S ') + unicode(self.action)
|
||
|
||
date = models.DateTimeField(auto_now=True, db_index=True)
|
||
request = models.ForeignKey(Request, verbose_name=_(u'Demande'))
|
||
user = models.ForeignKey(User, verbose_name=_(u'Utilisateur'), blank=True, null=True, on_delete=models.SET_NULL)
|
||
action = models.ForeignKey(workflow.Action, verbose_name=_('Action'), on_delete=models.PROTECT)
|
||
old_status = models.ForeignKey(workflow.Status,
|
||
verbose_name=_(u"État de la demande avant l'action"), blank=True,
|
||
related_name='+', null=True, on_delete=models.PROTECT)
|
||
new_status = models.ForeignKey(workflow.Status,
|
||
verbose_name=_(u"État de la demande après l'action"),
|
||
related_name='+', on_delete=models.PROTECT)
|
||
description = models.TextField(verbose_name=_(u"Détails éventuels sur l'action"), blank=True)
|