docbow/docbow_project/docbow/models.py

782 lines
29 KiB
Python

import os
import datetime as dt
import random
import hashlib
import fnmatch
import re
from collections import defaultdict
from django.db.models import (Model, ForeignKey, DateTimeField, CharField,
FileField, ManyToManyField, TextField, Manager, BooleanField,
OneToOneField, Q, EmailField, PositiveSmallIntegerField)
from django.contrib.auth.models import User, Group
from django.template.defaultfilters import slugify
from django.utils.translation import ugettext_lazy as _, pgettext_lazy
from django.conf import settings
from django.core.urlresolvers import reverse
from picklefield.fields import PickledObjectField
from django.utils.timezone import now
from django.forms import ValidationError
import django_journal
import timestamp
from .validators import validate_phone
from .utils import file_match_mime_types
from . import app_settings
DOCBOW_APP = _('docbow')
DOCBOW_APP = _('Docbow_App')
DOCBOW_APP2 = _('Docbow_app')
class GetByNameManager(Manager):
'''Manager providing a get_by_natural_key() method to retrieve object by
their name. The name field MUST be unique.'''
def get_by_natural_key(self, name):
return self.get(name=name)
class ContentManager(Manager):
'''Manager providing a get_by_natural_key() method to retrieve object by
their description. The description field MUST be unique.'''
def get_by_natural_key(self, description):
return self.get(description=description)
class NameNaturalKey(object):
'''Model mixin to export the name of a model as a natural key. The name
field MUST be unique.'''
def natural_key(self):
return (self.name,)
class FileType(NameNaturalKey, Model):
'''
A type of file that can be sent inside the application.
'''
objects = GetByNameManager()
name = CharField(max_length=128, unique=True)
is_active = BooleanField(verbose_name=_('is active'), default=True, blank=True)
def __unicode__(self):
return self.name
class Meta:
ordering = ['name']
verbose_name = _('File type')
verbose_name_plural = _('File types')
class FileTypeAttachedFileKindManager(Manager):
def get_by_natural_key(self, name, file_type_name):
return self.get(name=name, file_type__name=file_type_name)
class FileTypeAttachedFileKind(Model):
MIME_TYPES_RE = re.compile(r'^\s*(?:(?:text|image|audio|application|video)'
r'/(?:\*|[a-z-]+)(?:\s+(?:text|image|audio|application|video)'
r'/(?:\*|[a-z-.]+))*\s*)?$')
objects = FileTypeAttachedFileKindManager()
name = CharField(max_length=128, verbose_name=_('name'))
file_type = ForeignKey('FileType', verbose_name=_('document type'))
mime_types = TextField(verbose_name=_('mime types'),
help_text=('mime types separated by spaces, wildcards are allowed'),
blank=True)
cardinality = PositiveSmallIntegerField(default=0,
verbose_name=_('cardinality'))
position = PositiveSmallIntegerField(verbose_name=_('position'))
def clean(self):
if self.mime_types:
if not self.MIME_TYPES_RE.match(self.mime_types):
raise ValidationError(_('invalid mime types list'))
def get_mime_types(self):
return filter(None, re.split('\s+', self.mime_types.strip()))
def match_file(self, file_like):
return file_match_mime_types(file_like, self.get_mime_types())
def natural_key(self):
return (self.name, self.file_type.name)
def __unicode__(self):
return self.name
class Meta:
ordering = ('file_type', 'position', 'name')
unique_together = (('name', 'file_type'),)
verbose_name = _('file type attached file kind')
verbose_name_plural = _('file type attached file kinds')
class Content(Model):
'''Predefined content type'''
objects = ContentManager()
description = CharField(max_length=128, unique=True)
def __unicode__(self):
return self.description
class Meta:
ordering = ['description']
verbose_name = _('Content')
verbose_name_plural = _('Contents')
def username(user):
'''Return the full name of a user if it has one, the username otherwise.'''
return user.get_full_name() or user.username
def all_emails(user):
emails = []
if user.email:
emails.append(user.email)
try:
if user.docbowprofile.personal_email:
emails.append(user.docbowprofile.personal_email)
except DocbowProfile.DoesNotExist:
pass
return emails
def generate_filename(instance, filename):
'''Generate an unique filename for storing an uploaded file'''
now = dt.date.today()
# Keep all underscores and points
parts = filename.split('.')
parts = map(slugify, parts)
filename = '.'.join(parts)
return os.path.join('files', now.isoformat(), '%d_%s' % (random.randint(0, 10000000), filename))
FORWARD_PERMISSION = 'FORWARD_DOCUMENT'
class DocumentManager(Manager):
use_for_related_fields = True
def get_query_set(self):
'''Prefetch as much as possible.'''
return super(DocumentManager, self).get_query_set() \
.select_related() \
.prefetch_related('attached_files')
class Document(Model):
'''
Represent a file sent between a user and some targets, user or groups.
'''
objects = DocumentManager()
class Meta:
ordering = ['-date']
verbose_name = _('Document')
verbose_name_plural = _('Documents')
permissions = (
(FORWARD_PERMISSION, _("Can forward documents")),
)
sender = ForeignKey(User, verbose_name=_('Sender'),
related_name='documents_sent')
real_sender = CharField(max_length=64, blank=True, verbose_name=_('Real sender'))
date = DateTimeField(default=now,
verbose_name=_("Date d'envoi"))
to_user = ManyToManyField(User, related_name='directly_received_documents',
blank=True, null=True, verbose_name=_('Users to send to'))
to_list = ManyToManyField('MailingList', blank=True, null=True,
verbose_name=_('Groups to send to'))
filetype = ForeignKey(FileType, verbose_name=_('Document type'),
limit_choices_to={'is_active': True})
comment = TextField(blank=True, verbose_name=_('Comments'))
_timestamp = TextField(blank=True)
reply_to = ForeignKey('self', verbose_name=_('Reply to'),
blank=True, null=True, related_name='replies')
def __unicode__(self):
'''Return a displayable representation of the document sending.'''
ctx = {
'id': self.id,
'date': self.date.date().isoformat(),
'filetype': self.filetype,
'comment': self.comment,
'sender': username(self.sender),
}
return _('document {id} sent on {date} of type {filetype} about {comment} by {sender}').format(**ctx)
def filenames(self):
'''Returns a display string for the list of attached files'''
files = [ attached_file for attached_file in self.attached_files.all() ]
return ', '.join([f.filename() for f in files])
filenames.short_description = _('Attached files')
def filename_links(self):
'''Returns a display string containing links to download attached
files, only usable in the admin application.'''
links = []
qs = self.attached_files.all()
qs = qs.order_by('kind__position', 'kind__name', 'id')
last_kind_name = ''
for attached_file in qs:
kind = attached_file.kind
name = attached_file.filename()
url = attached_file.link()
links.append(u'<a href="%s">%s</a>' % (url, name))
if kind and kind.name != last_kind_name:
links[-1] = kind.name + '&nbsp;: ' + links[-1]
last_kind_name = kind.name
return ', '.join(links)
filename_links.short_description = _('Attached files')
filename_links.allow_tags = True
def user_human_to(self):
'''Return a sorted list of display names for user recipients.'''
return sorted(map(username, self.to_user.all()))
def group_human_to(self):
'''Return a sorted list of display names for list recipients.'''
return sorted(map(unicode, self.to_list.all()))
def human_to(self):
'''Return a sorted list of display names for all recipients.'''
return sorted(map(username, self.to_user.all()) + \
map(unicode, self.to_list.all()))
def recipients(self):
'''Return a comma separated sorted list of display names for all
recipients.
'''
return ', '.join(self.human_to())
recipients.short_description = _('Recipients')
def to(self):
'''Returns the list of all user recipients direct or through a list.'''
recipients = set()
recipients.update(self.to_user.all())
for mailing_list in self.to_list.all():
recipients.update(mailing_list.recursive_members())
return recipients
def to_with_origin(self):
'''Returns the set of recipients with their origin'''
recipients = defaultdict(lambda: set())
for mailing_list in self.to_list.all():
recipients.update(mailing_list.recursive_members_with_origin())
for user in self.to_user.all():
recipients[user].add('--direct--')
return recipients
def delivered_to(self):
'''Returns the list of user which received the document.
It can differ from ``to()`` if the members of a recipient
mailing-list changed.
'''
return User.objects.all(documents__document=self,
documents__outbox=False)
def timestamp(self, to=None):
if not self._timestamp:
blob = self.timestamp_blob(to=to)
self._timestamp = timestamp.timestamp_json(blob)
self.save()
django_journal.record('timestamp', 'timestamped document {document} result is {timestamp}',
document=self, timestamp=self._timestamp)
return self._timestamp
def post(self, forward=True):
'''Deliver the document into inbox of all active users recipients and into
outbox of the sender.
'''
to_with_origins = self.to_with_origin()
to = to_with_origins.keys()
# Create the timestamp
try:
self.timestamp(to=to)
except timestamp.TimestampingError, e:
django_journal.record('error', 'unable to timestamp {document}: '
'{exception}', document=self, exception=str(e))
# Record recipient lists
for mailing_list in self.to_list.all():
django_journal.record('delivery', 'deliver document {document} to members of list {mailing_list}',
document=self, mailing_list=mailing_list)
for user in to:
if user.is_active:
Mailbox.objects.get_or_create(owner=user, document=self)
if '--direct--' not in to_with_origins[user] or \
len(to_with_origins[user]) > 1:
lists = u', '.join(m.name for m in to_with_origins[user] if m != '--direct--')
if '--direct--' in to_with_origins[user]:
django_journal.record('delivery', 'deliver document '
'{document} in mailbox of user {recipient} '
'member of {lists} and as a direct recipient',
document=self, recipient=user, lists=lists)
else:
django_journal.record('delivery', 'deliver document '
'{document} in mailbox of user {recipient} '
'member of {lists}',
document=self, recipient=user, lists=lists)
else:
django_journal.record('delivery', 'deliver document {document} in mailbox of user {recipient} as a direct recipient',
document=self, recipient=user)
# Deliver to ouput mailbox of the sender
Mailbox.objects.get_or_create(owner=self.sender, outbox=True, document=self)
django_journal.record('delivery', 'deliver document {document} in output mailbox of user {recipient}',
document=self, recipient=self.sender)
# Push notifications
Notification.objects.notify(document=self, users=to)
if forward:
AutomaticForwarding.try_forwarding(self)
return len(to)
def forward(self, new_sender, lists, users, automatic=False):
'''Forward the document to new recipients.
new_sender - new sender
lists - new mailing lists recipients
users - new users recipients
automatic - whether this forwarding is the result of an
automatic treatment
'''
document = Document.objects.create(sender=new_sender,
filetype=self.filetype,
comment=self.comment)
document.to_user = users
document.to_list = lists
attached_files = [ AttachedFile(name=attached_file.name,
content=attached_file.content,
document=document) for attached_file in self.attached_files.all()]
AttachedFile.objects.bulk_create(attached_files)
recipients_count = document.post(forward=False)
return recipients_count, DocumentForwarded.objects.create(from_document=self,
to_document=document, automatic=automatic)
def timestamp_blob(self, to=None):
'''Create a dictionary containing information to timestamp.
It should be serialized (ex. using JSON) and cryptographically
timestamped (ex. using RFC3161).
'''
to = to or self.to()
blob = {}
blob['from'] = username(self.sender)
blob['date'] = self.date.isoformat()
blob['to'] = ', '.join(map(username, to))
blob['filetype'] = unicode(self.filetype)
blob['comment'] = self.comment
blob['files'] = []
for f in self.attached_files.all():
d = dict(name=f.filename(), size=f.content.size,
digest=hashlib.sha1(f.content.read()).hexdigest())
blob['files'].append(d)
return blob
def url(self):
return app_settings.BASE_URL + reverse('inbox-by-document-message',
kwargs=dict(document_id=self.id))
class DeletedMailbox(Model):
'''Holds deletion events for delegates on mailbox entries of their delegator.'''
mailbox = ForeignKey('Mailbox')
delegate = ForeignKey('auth.User')
class DocumentForwarded(Model):
'''Model to store tags applied to models.
First use will be to mark message as having been forwarded.
'''
from_document = ForeignKey(Document, related_name='document_forwarded_to')
to_document = ForeignKey(Document, related_name='document_forwarded_from')
date = DateTimeField(auto_now_add=True)
automatic = BooleanField(default=False)
def __unicode__(self):
if self.automatic:
return _(u'forwarded document {from_document} as {to_document} on {date} automatically').format(
from_document=self.from_document,
to_document=self.to_document,
date=self.date)
else:
return _(u'forwarded document {from_document} as {to_document} on {date}').format(
from_document=self.from_document,
to_document=self.to_document,
date=self.date)
def list_to_csv(l, mapping_func=None):
'''Convert a list to a comma separated string of its unicode values.
A mapping_func function can be passed to transform the list prior to the
formatting.
'''
if mapping_func:
l = map(mapping_func, l)
return u', '.join(map(unicode, l))
class AutomaticForwarding(Model):
'''
Choice of sender and filetype to transfer mail automatically to a list
of recipients.
'''
filetypes = ManyToManyField(FileType, related_name='forwarding_rules',
verbose_name=_('filetype'),
limit_choices_to={'is_active': True})
originaly_to_user = ManyToManyField(User, related_name='as_original_recipient_forwarding_rules',
blank=True, null=True, verbose_name=_('Original recipients'),
help_text=_('At least one recipient must match for the rule to '
'apply.'))
forward_to_user = ManyToManyField(User, related_name='as_recipient_forwarding_rules',
blank=True, null=True, verbose_name=_('Users to forward to'))
forward_to_list = ManyToManyField('MailingList', blank=True, null=True,
verbose_name=_('Groups to forward to'), related_name='as_recipient_forwarding_rules')
def __unicode__(self):
'''Return a display string for the forwarding rule.'''
ctx = {
'filetypes': list_to_csv(self.filetypes.all()),
'orginaly_to_user': list_to_csv(map(username, self.originaly_to_user.all())),
'to': list_to_csv(map(username, self.forward_to_user.all())
+ list(self.forward_to_list.all())) }
assert self.filetypes.all() or self.originaly_to_user.all()
if self.filetypes.all() and self.originaly_to_user.all():
tpl = _('Forward documents of type {filetypes} and to {originaly_to_user} automatically to {to}')
elif self.filetypes.all():
tpl = _('Forward documents of type {filetypes} automatically to {to}')
else:
tpl = _('Forward documents to {originaly_to_user} automatically to {to}')
return tpl.format(**ctx)
@classmethod
def try_forwarding(self, document):
'''Try applying matching rules to the document'''
for rule in self.objects.filter(Q(filetypes=document.filetype,
originaly_to_user__in=document.to())|
Q(filetypes=document.filetype, originaly_to_user__isnull=True)):
recipients_count, document_forwarded = document.forward(document.sender,
users=rule.forward_to_user.all(),
lists=rule.forward_to_list.all(),
automatic=True)
django_journal.record('automatic-forward', 'document {document} '
'automatically forwarded to {recipients_count} new recipients '
'as document {new_document}', document=document_forwarded.from_document,
new_document=document_forwarded.to_document,
recipients_count=recipients_count,
document_forwarded=document_forwarded)
class Meta:
verbose_name = _('Automatic forwarding rule')
verbose_name_plural = _('Automatic forwarding rules')
class AttachedFile(Model):
'''Uploaded file attached to a Document'''
name = CharField(max_length=128, verbose_name=_('Name'))
content = FileField(upload_to=generate_filename, verbose_name=_('File'))
document = ForeignKey(Document, verbose_name=_('Attached to'),
related_name='attached_files')
kind = ForeignKey('FileTypeAttachedFileKind', blank=True, null=True,
verbose_name=_('attached file kind'))
def filename(self):
'''Extract the original filename from generated unique filename for
storage.
It remove the part after the last underscore in the name.
'''
filename = os.path.basename(self.content.name)
try:
prefix, true_filename = filename.split('_', 1)
if prefix.isdigit():
return true_filename
except:
pass
return filename
def link(self):
'''Returns the link for downloading the attached file in the admin
application.
'''
return '/admin/docbow/attachedfile/%s/download/' % self.id
def __unicode__(self):
return self.name
def is_guest(user):
try:
return user.docbowprofile.is_guest
except DocbowProfile.DoesNotExist:
return False
class Delegation(Model):
'''
Delegate account, managable by user themselves.
'''
class Meta:
ordering = [ 'by' ]
verbose_name = _('Account delegation')
verbose_name_plural = _('Account delegations')
db_table = 'auth_delegation'
unique_together = (('by', 'to'),)
by = ForeignKey(User, related_name='delegations_to',
verbose_name=pgettext_lazy('delegation from', "From"))
to = ForeignKey(User, related_name='delegations_by',
verbose_name=pgettext_lazy('delegation to', "To"))
def __unicode__(self):
return u'delegation from {0}:{0.id} to {1}:{1.id}'.format(self.by, self.to)
@property
def guest_delegate(self):
try:
return self.to.docbowprofile.is_guest
except:
return False
class MailingListManager(GetByNameManager):
def active(self):
return self.filter(is_active=True)
def is_member_of(self, user):
return self.are_member_of([user])
def are_member_of(self, users):
lists = set(MailingList.objects.filter(members__in=users))
count = len(lists)
while True: # accumulate lists until it grows no more
lists |= set(MailingList.objects.filter(mailing_list_members__in=lists))
if count == len(lists):
break
count = len(lists)
return lists
class MailingList(NameNaturalKey, Model):
'''A list of recipients.'''
class Meta:
ordering = [ 'name' ]
verbose_name = _('Mailing list')
verbose_name_plural = _('Mailing lists')
name = CharField(max_length=128, verbose_name=_('Name'))
members = ManyToManyField(User, verbose_name=_('Members'), blank=True,
null=True, related_name='mailing_lists')
mailing_list_members = ManyToManyField('MailingList',
verbose_name=_('Mailing lists members'),
blank=True, related_name='members_lists')
is_active = BooleanField(verbose_name=_('is active'), blank=True,
default=True)
objects = MailingListManager()
def recursive_members(self, sublist_traversed=None):
'''Traverse this list and all its recursive sublist and accumulate
members.'''
if sublist_traversed is None:
sublist_traversed = set()
members = set()
members |= set(self.members.all())
for sublist in self.mailing_list_members.all():
if sublist not in sublist_traversed:
sublist_traversed.add(sublist)
members |= sublist.recursive_members(sublist_traversed)
return members
def recursive_members_with_origin(self, sublist_traversed=None):
'''Traverse this list and all its recursive sublist and accumulate
members and their origin.'''
if sublist_traversed is None:
sublist_traversed = defaultdict(lambda:0)
sublist_traversed[self] += 1
members = defaultdict(lambda:set())
for member in self.members.all():
members[member].add(self)
for sublist in self.mailing_list_members.all():
if sublist_traversed[sublist] < 3:
sub_members = sublist.recursive_members_with_origin(
sublist_traversed).iteritems()
for member, list_set in sub_members:
members[member] |= list_set
members[member].add(self)
return members
def __unicode__(self):
return self.name
class Mailbox(Model):
'''List of document received by a user'''
owner = ForeignKey(User, verbose_name=_('Mailbox owner'),
related_name='documents')
document = ForeignKey(Document, verbose_name=('Document'),
related_name='mailboxes')
seen = BooleanField(verbose_name=_('Seen'), blank=True)
deleted = BooleanField(verbose_name=_('Deleted'), blank=True)
outbox = BooleanField(verbose_name=_('Outbox message'), blank=True,
default=False)
date = DateTimeField(auto_now_add=True)
class Meta:
ordering = [ '-date' ]
verbose_name = _('Mailbox')
verbose_name_plural = _('Mailboxes')
def __unicode__(self):
if self.seen and self.deleted:
return _(u'seen and deleted mailbox entry {id} of user {user}:{user.id} created on '
u'{date} for {document}').format(id=self.id, user=self.owner,
date=self.date,
document=self.document)
elif self.deleted:
return _(u'deleted mailbox entry {id} of user {user}:{user.id} created on '
u'{date} for {document}').format(id=self.id, user=self.owner,
date=self.date,
document=self.document)
elif self.seen:
return _(u'seen mailbox entry {id} of user {user}:{user.id} created on '
u'{date} for {document}').format(id=self.id, user=self.owner,
date=self.date,
document=self.document)
else:
return _(u'mailbox entry {id} of user {user}:{user.id} created on '
u'{date} for {document}').format(id=self.id, user=self.owner,
date=self.date,
document=self.document)
class DocbowUser(User):
class Meta:
verbose_name = _('Docbow admin user')
proxy = True
app_label = 'auth'
class DocbowGroup(Group):
class Meta:
verbose_name = _('Docbow admin group')
app_label = 'auth'
proxy = True
class Inbox(Mailbox):
class Meta:
proxy = True
verbose_name = _('Inbox')
verbose_name_plural = _('Inboxes')
class Outbox(Mailbox):
class Meta:
proxy = True
verbose_name = _('Outbox')
verbose_name_plural = _('Outboxes')
class SendingLimitation(Model):
mailing_list = OneToOneField(MailingList, unique=True,
verbose_name=MailingList._meta.verbose_name)
filetypes = ManyToManyField(FileType, blank=True,
related_name='filetype_limitation',
verbose_name=_('Limitation des types de fichier'),
limit_choices_to={'is_active': True})
lists = ManyToManyField(MailingList, related_name='lists_limitation',
verbose_name=_('Limitation des destinataires'))
class Meta:
verbose_name = _('Limitation par liste de destinataires')
verbose_name = _('Limitation par liste de destinataires')
verbose_name_plural = verbose_name
def __unicode__(self):
return _(u'sending limitation for list {mailing_list} to filetypes'
u'{filetypes} and lists {lists}').format(
mailing_list=self.mailing_list,
filetypes=list_to_csv(self.filetypes.all()),
lists=list_to_csv(self.lists.all()))
class DocbowProfile(Model):
'''Hold extra user attributes'''
user = OneToOneField(User, unique=True)
is_guest = BooleanField(verbose_name=_('Guest user'), blank=True)
mobile_phone = CharField(max_length=32, verbose_name=_('Mobile phone'),
blank=True, validators=[validate_phone])
personal_email = EmailField(_('personal email address'), blank=True,
help_text=_('if you provide a personal email address, notifications '
'of new documents will also be sent to this address.'))
def __unicode__(self):
return _(u'docbow profile of {user}:{user.id} with mobile phone '
u'{mobile_phone} and personal email {personal_email}').format(
user=self.user,
mobile_phone=self.mobile_phone,
personal_email=self.personal_email)
class NotificationManager(Manager):
def notify(self, document=None, users=None, kind='new-document', ctx=None):
'''Build notifications in bulk'''
notifications = [Notification(user=user, document=document, kind=kind, ctx=ctx)
for user in users]
self.bulk_create(notifications)
class Notification(Model):
'''Asynchronous notification
create_dt - date and time of creation
document - the related document
user - the recipient of the notification
kind - the kind of notification
done - notification has been done successfully or not
failure - report of the last failure, if done is True, the notification
was successful if this is empty.
ctx - a pickled object
'''
objects = NotificationManager()
create_dt = DateTimeField(auto_now_add=True)
document = ForeignKey(Document, blank=True, null=True)
user = ForeignKey(User, blank=True, null=True)
kind = CharField(max_length=32, default='new-document')
done = BooleanField(blank=True, default=False)
failure = TextField(blank=True, null=True)
ctx = PickledObjectField(blank=True, null=True)
def __unicode__(self):
return _(u'notification {0}:{1}').format(self.kind, self.id)
class Meta:
ordering = ('-id',)
def guest_users():
return User.objects.filter(docbowprofile__is_guest=True)
def non_guest_users():
return User.objects.exclude(docbowprofile__is_guest=True)
# signals
from django.db.models.signals import m2m_changed
from django.dispatch import receiver
@receiver(m2m_changed, sender=User.groups.through)
def groups_changed(sender, instance, action, **kwargs):
'''When a user get a group, give it access to the administration.'''
if action.startswith('post'):
instance.is_staff = instance.groups.exists()
instance.save()
# do not remove me
# pylint: disable=W0611
import signals