779 lines
29 KiB
Python
779 lines
29 KiB
Python
import os
|
|
import datetime as dt
|
|
import random
|
|
import hashlib
|
|
import fnmatch
|
|
import re
|
|
from collections import defaultdict
|
|
|
|
from django.db.models import (Model, ForeignKey, DateTimeField, CharField,
|
|
FileField, ManyToManyField, TextField, Manager, BooleanField,
|
|
OneToOneField, Q, EmailField, PositiveSmallIntegerField)
|
|
from django.contrib.auth.models import User, Group
|
|
from django.template.defaultfilters import slugify
|
|
from django.utils.translation import ugettext_lazy as _, pgettext_lazy
|
|
from django.conf import settings
|
|
from django.core.urlresolvers import reverse
|
|
from picklefield.fields import PickledObjectField
|
|
from django.utils.timezone import now
|
|
from django.forms import ValidationError
|
|
|
|
import django_journal
|
|
import timestamp
|
|
from .validators import validate_phone
|
|
from .utils import file_match_mime_types
|
|
from . import app_settings
|
|
|
|
DOCBOW_APP = _('docbow')
|
|
DOCBOW_APP = _('Docbow_App')
|
|
DOCBOW_APP2 = _('Docbow_app')
|
|
|
|
class GetByNameManager(Manager):
|
|
'''Manager providing a get_by_natural_key() method to retrieve object by
|
|
their name. The name field MUST be unique.'''
|
|
def get_by_natural_key(self, name):
|
|
return self.get(name=name)
|
|
|
|
|
|
class ContentManager(Manager):
|
|
'''Manager providing a get_by_natural_key() method to retrieve object by
|
|
their description. The description field MUST be unique.'''
|
|
def get_by_natural_key(self, description):
|
|
return self.get(description=description)
|
|
|
|
|
|
class NameNaturalKey(object):
|
|
'''Model mixin to export the name of a model as a natural key. The name
|
|
field MUST be unique.'''
|
|
def natural_key(self):
|
|
return (self.name,)
|
|
|
|
|
|
class FileType(NameNaturalKey, Model):
|
|
'''
|
|
A type of file that can be sent inside the application.
|
|
'''
|
|
objects = GetByNameManager()
|
|
|
|
name = CharField(max_length=128, unique=True)
|
|
is_active = BooleanField(verbose_name=_('is active'), default=True, blank=True)
|
|
|
|
def __unicode__(self):
|
|
return self.name
|
|
|
|
class Meta:
|
|
ordering = ['name']
|
|
verbose_name = _('File type')
|
|
verbose_name_plural = _('File types')
|
|
|
|
|
|
class FileTypeAttachedFileKindManager(Manager):
|
|
def get_by_natural_key(self, name, file_type_name):
|
|
return self.get(name=name, file_type__name=file_type_name)
|
|
|
|
|
|
class FileTypeAttachedFileKind(Model):
|
|
MIME_TYPES_RE = re.compile(r'^\s*(?:(?:text|image|audio|application|video)'
|
|
r'/(?:\*|[a-z-]+)(?:\s+(?:text|image|audio|application|video)'
|
|
r'/(?:\*|[a-z-.]+))*\s*)?$')
|
|
|
|
objects = FileTypeAttachedFileKindManager()
|
|
|
|
name = CharField(max_length=128, verbose_name=_('name'))
|
|
file_type = ForeignKey('FileType', verbose_name=_('document type'))
|
|
mime_types = TextField(verbose_name=_('mime types'),
|
|
help_text=('mime types separated by spaces, wildcards are allowed'),
|
|
blank=True)
|
|
cardinality = PositiveSmallIntegerField(default=0,
|
|
verbose_name=_('cardinality'))
|
|
position = PositiveSmallIntegerField(verbose_name=_('position'))
|
|
|
|
def clean(self):
|
|
if self.mime_types:
|
|
if not self.MIME_TYPES_RE.match(self.mime_types):
|
|
raise ValidationError(_('invalid mime types list'))
|
|
|
|
def get_mime_types(self):
|
|
return filter(None, re.split('\s+', self.mime_types.strip()))
|
|
|
|
def match_file(self, file_like):
|
|
return file_match_mime_types(file_like, self.get_mime_types())
|
|
|
|
def natural_key(self):
|
|
return (self.name, self.file_type.name)
|
|
|
|
def __unicode__(self):
|
|
return self.name
|
|
|
|
class Meta:
|
|
ordering = ('file_type', 'position', 'name')
|
|
unique_together = (('name', 'file_type'),)
|
|
verbose_name = _('file type attached file kind')
|
|
verbose_name_plural = _('file type attached file kinds')
|
|
|
|
|
|
class Content(Model):
|
|
'''Predefined content type'''
|
|
objects = ContentManager()
|
|
|
|
description = CharField(max_length=128, unique=True)
|
|
|
|
def __unicode__(self):
|
|
return self.description
|
|
|
|
class Meta:
|
|
ordering = ['description']
|
|
verbose_name = _('Content')
|
|
verbose_name_plural = _('Contents')
|
|
|
|
|
|
def username(user):
|
|
'''Return the full name of a user if it has one, the username otherwise.'''
|
|
return user.get_full_name() or user.username
|
|
|
|
|
|
def all_emails(user):
|
|
emails = []
|
|
if user.email:
|
|
emails.append(user.email)
|
|
try:
|
|
if user.docbowprofile.personal_email:
|
|
emails.append(user.docbowprofile.personal_email)
|
|
except DocbowProfile.DoesNotExist:
|
|
pass
|
|
return emails
|
|
|
|
|
|
def generate_filename(instance, filename):
|
|
'''Generate an unique filename for storing an uploaded file'''
|
|
now = dt.date.today()
|
|
# Keep all underscores and points
|
|
parts = filename.split('.')
|
|
parts = map(slugify, parts)
|
|
filename = '.'.join(parts)
|
|
return os.path.join('files', now.isoformat(), '%d_%s' % (random.randint(0, 10000000), filename))
|
|
|
|
|
|
FORWARD_PERMISSION = 'FORWARD_DOCUMENT'
|
|
|
|
|
|
class DocumentManager(Manager):
|
|
use_for_related_fields = True
|
|
def get_query_set(self):
|
|
'''Prefetch as much as possible.'''
|
|
return super(DocumentManager, self).get_query_set() \
|
|
.select_related() \
|
|
.prefetch_related('attached_files')
|
|
|
|
|
|
class Document(Model):
|
|
'''
|
|
Represent a file sent between a user and some targets, user or groups.
|
|
'''
|
|
|
|
objects = DocumentManager()
|
|
|
|
class Meta:
|
|
ordering = ['-date']
|
|
verbose_name = _('Document')
|
|
verbose_name_plural = _('Documents')
|
|
permissions = (
|
|
(FORWARD_PERMISSION, _("Can forward documents")),
|
|
)
|
|
|
|
|
|
sender = ForeignKey(User, verbose_name=_('Sender'),
|
|
related_name='documents_sent')
|
|
real_sender = CharField(max_length=64, blank=True, verbose_name=_('Real sender'))
|
|
date = DateTimeField(default=now,
|
|
verbose_name=_("Date d'envoi"))
|
|
to_user = ManyToManyField(User, related_name='directly_received_documents',
|
|
blank=True, null=True, verbose_name=_('Users to send to'))
|
|
to_list = ManyToManyField('MailingList', blank=True, null=True,
|
|
verbose_name=_('Groups to send to'))
|
|
filetype = ForeignKey(FileType, verbose_name=_('Document type'),
|
|
limit_choices_to={'is_active': True})
|
|
comment = TextField(blank=True, verbose_name=_('Comments'))
|
|
_timestamp = TextField(blank=True)
|
|
reply_to = ForeignKey('self', verbose_name=_('Reply to'),
|
|
blank=True, null=True, related_name='replies')
|
|
|
|
def __unicode__(self):
|
|
'''Return a displayable representation of the document sending.'''
|
|
ctx = {
|
|
'id': self.id,
|
|
'date': self.date.date().isoformat(),
|
|
'filetype': self.filetype,
|
|
'comment': self.comment,
|
|
'sender': username(self.sender),
|
|
}
|
|
return _('document {id} sent on {date} of type {filetype} about {comment} by {sender}').format(**ctx)
|
|
|
|
def filenames(self):
|
|
'''Returns a display string for the list of attached files'''
|
|
files = [ attached_file for attached_file in self.attached_files.all() ]
|
|
return ', '.join([f.filename() for f in files])
|
|
filenames.short_description = _('Attached files')
|
|
|
|
def filename_links(self):
|
|
'''Returns a display string containing links to download attached
|
|
files, only usable in the admin application.'''
|
|
links = []
|
|
qs = self.attached_files.all()
|
|
qs = qs.order_by('kind__position', 'kind__name', 'id')
|
|
last_kind_name = ''
|
|
for attached_file in qs:
|
|
kind = attached_file.kind
|
|
name = attached_file.filename()
|
|
url = attached_file.link()
|
|
links.append(u'<a href="%s">%s</a>' % (url, name))
|
|
if kind and kind.name != last_kind_name:
|
|
links[-1] = kind.name + ' : ' + links[-1]
|
|
last_kind_name = kind.name
|
|
return ', '.join(links)
|
|
filename_links.short_description = _('Attached files')
|
|
filename_links.allow_tags = True
|
|
|
|
def user_human_to(self):
|
|
'''Return a sorted list of display names for user recipients.'''
|
|
return sorted(map(username, self.to_user.all()))
|
|
|
|
def group_human_to(self):
|
|
'''Return a sorted list of display names for list recipients.'''
|
|
return sorted(map(unicode, self.to_list.all()))
|
|
|
|
def human_to(self):
|
|
'''Return a sorted list of display names for all recipients.'''
|
|
return sorted(map(username, self.to_user.all()) + \
|
|
map(unicode, self.to_list.all()))
|
|
|
|
def recipients(self):
|
|
'''Return a comma separated sorted list of display names for all
|
|
recipients.
|
|
'''
|
|
return ', '.join(self.human_to())
|
|
recipients.short_description = _('Recipients')
|
|
|
|
def to(self):
|
|
'''Returns the list of all user recipients direct or through a list.'''
|
|
recipients = set()
|
|
recipients.update(self.to_user.all())
|
|
for mailing_list in self.to_list.all():
|
|
recipients.update(mailing_list.recursive_members())
|
|
return recipients
|
|
|
|
def to_with_origin(self):
|
|
'''Returns the set of recipients with their origin'''
|
|
recipients = defaultdict(lambda: set())
|
|
for mailing_list in self.to_list.all():
|
|
recipients.update(mailing_list.recursive_members_with_origin())
|
|
for user in self.to_user.all():
|
|
recipients[user].add('--direct--')
|
|
return recipients
|
|
|
|
def delivered_to(self):
|
|
'''Returns the list of user which received the document.
|
|
|
|
It can differ from ``to()`` if the members of a recipient
|
|
mailing-list changed.
|
|
'''
|
|
return User.objects.all(documents__document=self,
|
|
documents__outbox=False)
|
|
|
|
def timestamp(self, to=None):
|
|
if not self._timestamp:
|
|
blob = self.timestamp_blob(to=to)
|
|
self._timestamp = timestamp.timestamp_json(blob)
|
|
self.save()
|
|
django_journal.record('timestamp', 'timestamped document {document} result is {timestamp}',
|
|
document=self, timestamp=self._timestamp)
|
|
return self._timestamp
|
|
|
|
def post(self, forward=True):
|
|
'''Deliver the document into inbox of all active users recipients and into
|
|
outbox of the sender.
|
|
'''
|
|
to_with_origins = self.to_with_origin()
|
|
to = to_with_origins.keys()
|
|
# Create the timestamp
|
|
try:
|
|
self.timestamp(to=to)
|
|
except timestamp.TimestampingError, e:
|
|
django_journal.record('error', 'unable to timestamp {document}: '
|
|
'{exception}', document=self, exception=str(e))
|
|
# Record recipient lists
|
|
for mailing_list in self.to_list.all():
|
|
django_journal.record('delivery', 'deliver document {document} to members of list {mailing_list}',
|
|
document=self, mailing_list=mailing_list)
|
|
for user in to:
|
|
if user.is_active:
|
|
Mailbox.objects.get_or_create(owner=user, document=self)
|
|
if '--direct--' not in to_with_origins[user] or \
|
|
len(to_with_origins[user]) > 1:
|
|
lists = u', '.join(m.name for m in to_with_origins[user] if m != '--direct--')
|
|
if '--direct--' in to_with_origins[user]:
|
|
django_journal.record('delivery', 'deliver document '
|
|
'{document} in mailbox of user {recipient} '
|
|
'member of {lists} and as a direct recipient',
|
|
document=self, recipient=user, lists=lists)
|
|
else:
|
|
django_journal.record('delivery', 'deliver document '
|
|
'{document} in mailbox of user {recipient} '
|
|
'member of {lists}',
|
|
document=self, recipient=user, lists=lists)
|
|
else:
|
|
django_journal.record('delivery', 'deliver document {document} in mailbox of user {recipient} as a direct recipient',
|
|
document=self, recipient=user)
|
|
# Deliver to ouput mailbox of the sender
|
|
Mailbox.objects.get_or_create(owner=self.sender, outbox=True, document=self)
|
|
django_journal.record('delivery', 'deliver document {document} in output mailbox of user {recipient}',
|
|
document=self, recipient=self.sender)
|
|
# Push notifications
|
|
Notification.objects.notify(document=self, users=to)
|
|
if forward:
|
|
AutomaticForwarding.try_forwarding(self)
|
|
return len(to)
|
|
|
|
def forward(self, new_sender, lists, users, automatic=False):
|
|
'''Forward the document to new recipients.
|
|
|
|
new_sender - new sender
|
|
lists - new mailing lists recipients
|
|
users - new users recipients
|
|
automatic - whether this forwarding is the result of an
|
|
automatic treatment
|
|
'''
|
|
document = Document.objects.create(sender=new_sender,
|
|
filetype=self.filetype,
|
|
comment=self.comment)
|
|
document.to_user = users
|
|
document.to_list = lists
|
|
attached_files = [ AttachedFile(name=attached_file.name,
|
|
content=attached_file.content,
|
|
document=document) for attached_file in self.attached_files.all()]
|
|
AttachedFile.objects.bulk_create(attached_files)
|
|
recipients_count = document.post(forward=False)
|
|
return recipients_count, DocumentForwarded.objects.create(from_document=self,
|
|
to_document=document, automatic=automatic)
|
|
|
|
def timestamp_blob(self, to=None):
|
|
'''Create a dictionary containing information to timestamp.
|
|
|
|
It should be serialized (ex. using JSON) and cryptographically
|
|
timestamped (ex. using RFC3161).
|
|
'''
|
|
to = to or self.to()
|
|
blob = {}
|
|
blob['from'] = username(self.sender)
|
|
blob['date'] = self.date.isoformat()
|
|
blob['to'] = ', '.join(map(username, to))
|
|
blob['filetype'] = unicode(self.filetype)
|
|
blob['comment'] = self.comment
|
|
blob['files'] = []
|
|
for f in self.attached_files.all():
|
|
d = dict(name=f.filename(), size=f.content.size,
|
|
digest=hashlib.sha1(f.content.read()).hexdigest())
|
|
blob['files'].append(d)
|
|
return blob
|
|
|
|
def url(self):
|
|
return app_settings.BASE_URL + reverse('inbox-by-document-message',
|
|
kwargs=dict(document_id=self.id))
|
|
|
|
|
|
class DeletedMailbox(Model):
|
|
'''Holds deletion events for delegates on mailbox entries of their delegator.'''
|
|
mailbox = ForeignKey('Mailbox')
|
|
delegate = ForeignKey('auth.User')
|
|
|
|
|
|
class DocumentForwarded(Model):
|
|
'''Model to store tags applied to models.
|
|
|
|
First use will be to mark message as having been forwarded.
|
|
'''
|
|
from_document = ForeignKey(Document, related_name='document_forwarded_to')
|
|
to_document = ForeignKey(Document, related_name='document_forwarded_from')
|
|
date = DateTimeField(auto_now_add=True)
|
|
automatic = BooleanField(default=False)
|
|
|
|
def __unicode__(self):
|
|
if self.automatic:
|
|
return _(u'forwarded document {from_document} as {to_document} on {date} automatically').format(
|
|
from_document=self.from_document,
|
|
to_document=self.to_document,
|
|
date=self.date)
|
|
else:
|
|
return _(u'forwarded document {from_document} as {to_document} on {date}').format(
|
|
from_document=self.from_document,
|
|
to_document=self.to_document,
|
|
date=self.date)
|
|
|
|
|
|
def list_to_csv(l, mapping_func=None):
|
|
'''Convert a list to a comma separated string of its unicode values.
|
|
|
|
A mapping_func function can be passed to transform the list prior to the
|
|
formatting.
|
|
'''
|
|
if mapping_func:
|
|
l = map(mapping_func, l)
|
|
return u', '.join(map(unicode, l))
|
|
|
|
|
|
class AutomaticForwarding(Model):
|
|
'''
|
|
Choice of sender and filetype to transfer mail automatically to a list
|
|
of recipients.
|
|
'''
|
|
filetypes = ManyToManyField(FileType, related_name='forwarding_rules',
|
|
verbose_name=_('filetype'),
|
|
limit_choices_to={'is_active': True})
|
|
originaly_to_user = ManyToManyField(User, related_name='as_original_recipient_forwarding_rules',
|
|
blank=True, null=True, verbose_name=_('Original recipients'),
|
|
help_text=_('At least one recipient must match for the rule to '
|
|
'apply.'))
|
|
forward_to_user = ManyToManyField(User, related_name='as_recipient_forwarding_rules',
|
|
blank=True, null=True, verbose_name=_('Users to forward to'))
|
|
forward_to_list = ManyToManyField('MailingList', blank=True, null=True,
|
|
verbose_name=_('Groups to forward to'), related_name='as_recipient_forwarding_rules')
|
|
|
|
def __unicode__(self):
|
|
'''Return a display string for the forwarding rule.'''
|
|
ctx = {
|
|
'filetypes': list_to_csv(self.filetypes.all()),
|
|
'orginaly_to_user': list_to_csv(map(username, self.originaly_to_user.all())),
|
|
'to': list_to_csv(map(username, self.forward_to_user.all())
|
|
+ list(self.forward_to_list.all())) }
|
|
assert self.filetypes.all() or self.originaly_to_user.all()
|
|
if self.filetypes.all() and self.originaly_to_user.all():
|
|
tpl = _('Forward documents of type {filetypes} and to {originaly_to_user} automatically to {to}')
|
|
elif self.filetypes.all():
|
|
tpl = _('Forward documents of type {filetypes} automatically to {to}')
|
|
else:
|
|
tpl = _('Forward documents to {originaly_to_user} automatically to {to}')
|
|
return tpl.format(**ctx)
|
|
|
|
@classmethod
|
|
def try_forwarding(self, document):
|
|
'''Try applying matching rules to the document'''
|
|
for rule in self.objects.filter(Q(filetypes=document.filetype,
|
|
originaly_to_user__in=document.to())|
|
|
Q(filetypes=document.filetype, originaly_to_user__isnull=True)):
|
|
recipients_count, document_forwarded = document.forward(document.sender,
|
|
users=rule.forward_to_user.all(),
|
|
lists=rule.forward_to_list.all(),
|
|
automatic=True)
|
|
django_journal.record('automatic-forward', 'document {document} '
|
|
'automatically forwarded to {recipients_count} new recipients '
|
|
'as document {new_document}', document=document_forwarded.from_document,
|
|
new_document=document_forwarded.to_document,
|
|
recipients_count=recipients_count,
|
|
document_forwarded=document_forwarded)
|
|
|
|
class Meta:
|
|
verbose_name = _('Automatic forwarding rule')
|
|
verbose_name_plural = _('Automatic forwarding rules')
|
|
|
|
|
|
class AttachedFile(Model):
|
|
'''Uploaded file attached to a Document'''
|
|
name = CharField(max_length=128, verbose_name=_('Name'))
|
|
content = FileField(upload_to=generate_filename, verbose_name=_('File'))
|
|
document = ForeignKey(Document, verbose_name=_('Attached to'),
|
|
related_name='attached_files')
|
|
kind = ForeignKey('FileTypeAttachedFileKind', blank=True, null=True,
|
|
verbose_name=_('attached file kind'))
|
|
|
|
def filename(self):
|
|
'''Extract the original filename from generated unique filename for
|
|
storage.
|
|
|
|
It remove the part after the last underscore in the name.
|
|
'''
|
|
filename = os.path.basename(self.content.name)
|
|
try:
|
|
prefix, true_filename = filename.split('_', 1)
|
|
if prefix.isdigit():
|
|
return true_filename
|
|
except:
|
|
pass
|
|
|
|
return filename
|
|
|
|
def link(self):
|
|
'''Returns the link for downloading the attached file in the admin
|
|
application.
|
|
'''
|
|
return '/admin/docbow/attachedfile/%s/download/' % self.id
|
|
|
|
def __unicode__(self):
|
|
return self.name
|
|
|
|
def is_guest(user):
|
|
try:
|
|
return user.docbowprofile.is_guest
|
|
except DocbowProfile.DoesNotExist:
|
|
return False
|
|
|
|
|
|
class Delegation(Model):
|
|
'''
|
|
Delegate account, managable by user themselves.
|
|
'''
|
|
class Meta:
|
|
ordering = [ 'by' ]
|
|
verbose_name = _('Account delegation')
|
|
verbose_name_plural = _('Account delegations')
|
|
db_table = 'auth_delegation'
|
|
unique_together = (('by', 'to'),)
|
|
by = ForeignKey(User, related_name='delegations_to',
|
|
verbose_name=pgettext_lazy('delegation from', "From"))
|
|
to = ForeignKey(User, related_name='delegations_by',
|
|
verbose_name=pgettext_lazy('delegation to', "To"))
|
|
|
|
def __unicode__(self):
|
|
return u'delegation from {0}:{0.id} to {1}:{1.id}'.format(self.by, self.to)
|
|
|
|
@property
|
|
def guest_delegate(self):
|
|
try:
|
|
return self.to.docbowprofile.is_guest
|
|
except:
|
|
return False
|
|
|
|
|
|
class MailingListManager(GetByNameManager):
|
|
def active(self):
|
|
return self.filter(is_active=True)
|
|
|
|
def is_member_of(self, user):
|
|
lists = set(MailingList.objects.filter(members=user))
|
|
count = len(lists)
|
|
while True: # accumulate lists until it grows no more
|
|
lists |= set(MailingList.objects.filter(mailing_list_members__in=lists))
|
|
if count == len(lists):
|
|
break
|
|
count = len(lists)
|
|
return lists
|
|
|
|
|
|
class MailingList(NameNaturalKey, Model):
|
|
'''A list of recipients.'''
|
|
class Meta:
|
|
ordering = [ 'name' ]
|
|
verbose_name = _('Mailing list')
|
|
verbose_name_plural = _('Mailing lists')
|
|
name = CharField(max_length=128, verbose_name=_('Name'))
|
|
members = ManyToManyField(User, verbose_name=_('Members'), blank=True,
|
|
null=True, related_name='mailing_lists')
|
|
mailing_list_members = ManyToManyField('MailingList',
|
|
verbose_name=_('Mailing lists members'),
|
|
blank=True, related_name='members_lists')
|
|
is_active = BooleanField(verbose_name=_('is active'), blank=True,
|
|
default=True)
|
|
|
|
objects = MailingListManager()
|
|
|
|
def recursive_members(self, sublist_traversed=None):
|
|
'''Traverse this list and all its recursive sublist and accumulate
|
|
members.'''
|
|
if sublist_traversed is None:
|
|
sublist_traversed = set()
|
|
members = set()
|
|
members |= set(self.members.all())
|
|
for sublist in self.mailing_list_members.all():
|
|
if sublist not in sublist_traversed:
|
|
sublist_traversed.add(sublist)
|
|
members |= sublist.recursive_members(sublist_traversed)
|
|
return members
|
|
|
|
def recursive_members_with_origin(self, sublist_traversed=None):
|
|
'''Traverse this list and all its recursive sublist and accumulate
|
|
members and their origin.'''
|
|
if sublist_traversed is None:
|
|
sublist_traversed = defaultdict(lambda:0)
|
|
sublist_traversed[self] += 1
|
|
members = defaultdict(lambda:set())
|
|
for member in self.members.all():
|
|
members[member].add(self)
|
|
for sublist in self.mailing_list_members.all():
|
|
if sublist_traversed[sublist] < 3:
|
|
sub_members = sublist.recursive_members_with_origin(
|
|
sublist_traversed).iteritems()
|
|
for member, list_set in sub_members:
|
|
members[member] |= list_set
|
|
members[member].add(self)
|
|
return members
|
|
|
|
def __unicode__(self):
|
|
return self.name
|
|
|
|
|
|
class Mailbox(Model):
|
|
'''List of document received by a user'''
|
|
owner = ForeignKey(User, verbose_name=_('Mailbox owner'),
|
|
related_name='documents')
|
|
document = ForeignKey(Document, verbose_name=('Document'),
|
|
related_name='mailboxes')
|
|
seen = BooleanField(verbose_name=_('Seen'), blank=True)
|
|
deleted = BooleanField(verbose_name=_('Deleted'), blank=True)
|
|
outbox = BooleanField(verbose_name=_('Outbox message'), blank=True,
|
|
default=False)
|
|
date = DateTimeField(auto_now_add=True)
|
|
|
|
class Meta:
|
|
ordering = [ '-date' ]
|
|
verbose_name = _('Mailbox')
|
|
verbose_name_plural = _('Mailboxes')
|
|
|
|
def __unicode__(self):
|
|
if self.seen and self.deleted:
|
|
return _(u'seen and deleted mailbox entry {id} of user {user}:{user.id} created on '
|
|
u'{date} for {document}').format(id=self.id, user=self.owner,
|
|
date=self.date,
|
|
document=self.document)
|
|
elif self.deleted:
|
|
return _(u'deleted mailbox entry {id} of user {user}:{user.id} created on '
|
|
u'{date} for {document}').format(id=self.id, user=self.owner,
|
|
date=self.date,
|
|
document=self.document)
|
|
elif self.seen:
|
|
return _(u'seen mailbox entry {id} of user {user}:{user.id} created on '
|
|
u'{date} for {document}').format(id=self.id, user=self.owner,
|
|
date=self.date,
|
|
document=self.document)
|
|
else:
|
|
return _(u'mailbox entry {id} of user {user}:{user.id} created on '
|
|
u'{date} for {document}').format(id=self.id, user=self.owner,
|
|
date=self.date,
|
|
document=self.document)
|
|
|
|
class DocbowUser(User):
|
|
class Meta:
|
|
verbose_name = _('Docbow admin user')
|
|
proxy = True
|
|
app_label = 'auth'
|
|
|
|
|
|
class DocbowGroup(Group):
|
|
class Meta:
|
|
verbose_name = _('Docbow admin group')
|
|
app_label = 'auth'
|
|
proxy = True
|
|
|
|
|
|
class Inbox(Mailbox):
|
|
class Meta:
|
|
proxy = True
|
|
verbose_name = _('Inbox')
|
|
verbose_name_plural = _('Inboxes')
|
|
|
|
|
|
class Outbox(Mailbox):
|
|
class Meta:
|
|
proxy = True
|
|
verbose_name = _('Outbox')
|
|
verbose_name_plural = _('Outboxes')
|
|
|
|
|
|
class SendingLimitation(Model):
|
|
mailing_list = OneToOneField(MailingList, unique=True,
|
|
verbose_name=MailingList._meta.verbose_name)
|
|
filetypes = ManyToManyField(FileType, blank=True,
|
|
related_name='filetype_limitation',
|
|
verbose_name=_('Limitation des types de fichier'),
|
|
limit_choices_to={'is_active': True})
|
|
lists = ManyToManyField(MailingList, related_name='lists_limitation',
|
|
verbose_name=_('Limitation des destinataires'))
|
|
|
|
class Meta:
|
|
verbose_name = _('Limitation par liste de destinataires')
|
|
verbose_name = _('Limitation par liste de destinataires')
|
|
verbose_name_plural = verbose_name
|
|
|
|
def __unicode__(self):
|
|
return _(u'sending limitation for list {mailing_list} to filetypes'
|
|
u'{filetypes} and lists {lists}').format(
|
|
mailing_list=self.mailing_list,
|
|
filetypes=list_to_csv(self.filetypes.all()),
|
|
lists=list_to_csv(self.lists.all()))
|
|
|
|
|
|
class DocbowProfile(Model):
|
|
'''Hold extra user attributes'''
|
|
user = OneToOneField(User, unique=True)
|
|
is_guest = BooleanField(verbose_name=_('Guest user'), blank=True)
|
|
mobile_phone = CharField(max_length=32, verbose_name=_('Mobile phone'),
|
|
blank=True, validators=[validate_phone])
|
|
personal_email = EmailField(_('personal email address'), blank=True,
|
|
help_text=_('if you provide a personal email address, notifications '
|
|
'of new documents will also be sent to this address.'))
|
|
|
|
def __unicode__(self):
|
|
return _(u'docbow profile of {user}:{user.id} with mobile phone '
|
|
u'{mobile_phone} and personal email {personal_email}').format(
|
|
user=self.user,
|
|
mobile_phone=self.mobile_phone,
|
|
personal_email=self.personal_email)
|
|
|
|
|
|
class NotificationManager(Manager):
|
|
def notify(self, document=None, users=None, kind='new-document', ctx=None):
|
|
'''Build notifications in bulk'''
|
|
notifications = [Notification(user=user, document=document, kind=kind, ctx=ctx)
|
|
for user in users]
|
|
self.bulk_create(notifications)
|
|
|
|
|
|
class Notification(Model):
|
|
'''Asynchronous notification
|
|
|
|
create_dt - date and time of creation
|
|
document - the related document
|
|
user - the recipient of the notification
|
|
kind - the kind of notification
|
|
done - notification has been done successfully or not
|
|
failure - report of the last failure, if done is True, the notification
|
|
was successful if this is empty.
|
|
ctx - a pickled object
|
|
'''
|
|
objects = NotificationManager()
|
|
|
|
create_dt = DateTimeField(auto_now_add=True)
|
|
document = ForeignKey(Document, blank=True, null=True)
|
|
user = ForeignKey(User, blank=True, null=True)
|
|
kind = CharField(max_length=32, default='new-document')
|
|
done = BooleanField(blank=True, default=False)
|
|
failure = TextField(blank=True, null=True)
|
|
ctx = PickledObjectField(blank=True, null=True)
|
|
|
|
def __unicode__(self):
|
|
return _(u'notification {0}:{1}').format(self.kind, self.id)
|
|
|
|
class Meta:
|
|
ordering = ('-id',)
|
|
|
|
def guest_users():
|
|
return User.objects.filter(docbowprofile__is_guest=True)
|
|
|
|
def non_guest_users():
|
|
return User.objects.exclude(docbowprofile__is_guest=True)
|
|
|
|
# signals
|
|
|
|
from django.db.models.signals import m2m_changed
|
|
from django.dispatch import receiver
|
|
|
|
|
|
@receiver(m2m_changed, sender=User.groups.through)
|
|
def groups_changed(sender, instance, action, **kwargs):
|
|
'''When a user get a group, give it access to the administration.'''
|
|
if action.startswith('post'):
|
|
instance.is_staff = instance.groups.exists()
|
|
instance.save()
|
|
|
|
# do not remove me
|
|
# pylint: disable=W0611
|
|
import signals
|