docbow/docbow_project/docbow/models.py

942 lines
32 KiB
Python

import os
import datetime as dt
import itertools
import random
import hashlib
import html
import re
from collections import defaultdict
import time
from django.db.models import (
Model,
ForeignKey,
DateTimeField,
CharField,
FileField,
ManyToManyField,
TextField,
Manager,
BooleanField,
OneToOneField,
Q,
EmailField,
PositiveSmallIntegerField,
CASCADE,
PositiveIntegerField,
)
from django.contrib.auth.models import User, Group
from django.conf import settings
from django.template.defaultfilters import slugify
from django.utils.translation import ugettext_lazy as _, pgettext_lazy
from django.urls import reverse
from picklefield.fields import PickledObjectField
from django.utils.html import strip_tags
from django.utils.timezone import now, utc
from django.forms import ValidationError
from django.utils.encoding import force_text, python_2_unicode_compatible
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.safestring import mark_safe
from django_journal import journal as django_journal
from docbow_project.docbow.validators import validate_phone
from docbow_project.docbow.utils import file_match_mime_types
from docbow_project.docbow import app_settings
DOCBOW_APP = _('docbow')
DOCBOW_APP = _('Docbow_App')
DOCBOW_APP2 = _('Docbow_app')
# fixup User ordering
User._meta.ordering = ['username']
def ellipsize(text, length=50):
text = html.unescape(strip_tags(text))
if len(text) < length:
return text
return text[: (length - 10)] + '...'
class GetByNameManager(Manager):
"""Manager providing a get_by_natural_key() method to retrieve object by
their name. The name field MUST be unique."""
def get_by_natural_key(self, name):
return self.get(name=name)
class ContentManager(Manager):
"""Manager providing a get_by_natural_key() method to retrieve object by
their description. The description field MUST be unique."""
def get_by_natural_key(self, description):
return self.get(description=description)
class NameNaturalKey(object):
"""Model mixin to export the name of a model as a natural key. The name
field MUST be unique."""
def natural_key(self):
return (self.name,)
@python_2_unicode_compatible
class FileType(NameNaturalKey, Model):
"""
A type of file that can be sent inside the application.
"""
objects = GetByNameManager()
name = CharField(max_length=128, unique=True)
is_active = BooleanField(verbose_name=_('is active'), default=True, blank=True)
extra_senders = PositiveIntegerField(verbose_name=_('Extra senders'), default=0)
class Meta:
ordering = ['name']
verbose_name = _('File type')
verbose_name_plural = _('File types')
def __str__(self):
return self.name
class FileTypeAttachedFileKindManager(Manager):
def get_by_natural_key(self, name, file_type_name):
return self.get(name=name, file_type__name=file_type_name)
@python_2_unicode_compatible
class FileTypeAttachedFileKind(Model):
MIME_TYPES_RE = re.compile(
r'^\s*(?:(?:text|image|audio|application|video)'
r'/(?:\*|[a-z-]+)(?:\s+(?:text|image|audio|application|video)'
r'/(?:\*|[a-z-.]+))*\s*)?$'
)
objects = FileTypeAttachedFileKindManager()
name = CharField(max_length=128, verbose_name=_('name'))
file_type = ForeignKey('FileType', verbose_name=_('document type'), on_delete=CASCADE)
mime_types = TextField(
verbose_name=_('mime types'),
help_text=('mime types separated by spaces, wildcards are allowed'),
blank=True,
)
cardinality = PositiveSmallIntegerField(
default=0, verbose_name=_('cardinality'), help_text=_('zero is a special value setting no limitation')
)
minimum = PositiveSmallIntegerField(default=0, verbose_name=_('minimum number of files'))
position = PositiveSmallIntegerField(verbose_name=_('position'))
class Meta:
ordering = ('file_type', 'position', 'name')
unique_together = (('name', 'file_type'),)
verbose_name = _('file type attached file kind')
verbose_name_plural = _('file type attached file kinds')
def clean(self):
if self.cardinality != 0 and not self.minimum <= self.cardinality:
raise ValidationError(
_('minimum must be inferior to maximum number of files if maximum is not zero')
)
if self.mime_types:
if not self.MIME_TYPES_RE.match(self.mime_types):
raise ValidationError(_('invalid mime types list'))
def get_mime_types(self):
return list(filter(None, re.split(r'\s+', self.mime_types.strip())))
def match_file(self, file_like):
return file_match_mime_types(file_like, self.get_mime_types(), app_settings.MIME_BUFFER_SIZE)
def natural_key(self):
return (self.name, self.file_type.name)
def __str__(self):
return self.name
@python_2_unicode_compatible
class Content(Model):
'''Predefined content type'''
objects = ContentManager()
description = CharField(max_length=128, unique=True)
class Meta:
ordering = ['description']
verbose_name = _('Content')
verbose_name_plural = _('Contents')
def __str__(self):
return self.description
def username(user):
'''Return the full name of a user if it has one, the username otherwise.'''
if user.first_name or user.last_name:
d = dict(last_name='', first_name='')
d.update(user.__dict__)
return u'{last_name} {first_name}'.format(**d)
return user.username
def all_emails(user):
emails = []
if user.email:
emails.append(user.email)
try:
if user.docbowprofile.personal_email:
emails.append(user.docbowprofile.personal_email)
except DocbowProfile.DoesNotExist:
pass
return emails
def generate_filename(instance, filename):
'''Generate an unique filename for storing an uploaded file'''
now = dt.date.today()
# Keep all underscores and points
parts = filename.split('.')
parts = map(slugify, parts)
filename = '.'.join(parts)
return os.path.join('files', now.isoformat(), '%d_%s' % (random.randint(0, 10000000), filename))
FORWARD_PERMISSION = 'FORWARD_DOCUMENT'
class DocumentManager(Manager):
def get_query_set(self):
'''Prefetch as much as possible.'''
return (
super(DocumentManager, self).get_query_set().select_related().prefetch_related('attached_files')
)
@python_2_unicode_compatible
class Document(Model):
"""
Represent a file sent between a user and some targets, user or groups.
"""
objects = DocumentManager()
class Meta:
ordering = ['-date']
verbose_name = _('Document')
verbose_name_plural = _('Documents')
permissions = ((FORWARD_PERMISSION, _("Can forward documents")),)
base_manager_name = 'objects'
sender = ForeignKey(User, verbose_name=_('Sender'), on_delete=CASCADE, related_name='documents_sent')
real_sender = CharField(max_length=64, blank=True, verbose_name=_('Real sender'))
extra_senders = ManyToManyField(User, verbose_name=_('Extra senders'))
date = DateTimeField(default=now, verbose_name=_("Date d'envoi"))
to_user = ManyToManyField(
User, related_name='directly_received_documents', blank=True, verbose_name=_('Users to send to')
)
to_list = ManyToManyField('MailingList', blank=True, verbose_name=_('Groups to send to'))
filetype = ForeignKey(
FileType, verbose_name=_('Document type'), on_delete=CASCADE, limit_choices_to={'is_active': True}
)
comment = TextField(blank=True, verbose_name=_('Comments'))
_timestamp = TextField(blank=True)
reply_to = ForeignKey(
'self', verbose_name=_('Reply to'), on_delete=CASCADE, blank=True, null=True, related_name='replies'
)
private = BooleanField(
default=False, verbose_name=_('Private'), help_text=_('delegates cannot see this document')
)
def __str__(self):
'''Return a displayable representation of the document sending.'''
ctx = {
'id': self.id,
'date': self.date.date().isoformat(),
'filetype': self.filetype,
'comment': self.comment,
'sender': username(self.sender),
}
return _('document {id} sent on {date} of type {filetype} about {comment} by {sender}').format(**ctx)
def filenames(self):
'''Returns a display string for the list of attached files'''
files = [attached_file for attached_file in self.attached_files.all()]
return ', '.join([f.filename() for f in files])
filenames.short_description = _('Attached files')
def filename_links(self):
"""Returns a display string containing links to download attached
files, only usable in the admin application."""
links = []
qs = self.attached_files.all()
qs = qs.order_by('kind__position', 'kind__name', 'id')
last_kind_name = ''
for attached_file in qs:
kind = attached_file.kind
name = attached_file.filename()
url = attached_file.link()
links.append(u'<a href="%s">%s</a>' % (url, name))
if kind and kind.name != last_kind_name:
links[-1] = kind.name + '&nbsp;: ' + links[-1]
last_kind_name = kind.name
return mark_safe(', '.join(links))
filename_links.short_description = _('Attached files')
def user_human_to(self):
'''Return a sorted list of display names for user recipients.'''
return sorted(map(username, self.to_user.all()))
def group_human_to(self):
'''Return a sorted list of display names for list recipients.'''
return sorted(map(force_text, self.to_list.all()))
def human_to(self):
'''Return a sorted list of display names for all recipients.'''
return sorted(list(map(username, self.to_user.all())) + list(map(force_text, self.to_list.all())))
def recipients(self):
"""Return a comma separated sorted list of display names for all
recipients.
"""
return ', '.join(self.human_to())
recipients.short_description = _('Recipients')
def to(self):
'''Returns the list of all user recipients direct or through a list.'''
recipients = set()
recipients.update(self.to_user.all())
for mailing_list in self.to_list.all():
recipients.update(mailing_list.recursive_members())
return recipients
def to_with_origin(self):
'''Returns the set of recipients with their origin'''
recipients = defaultdict(lambda: set())
for mailing_list in self.to_list.all():
recipients.update(mailing_list.recursive_members_with_origin())
for user in self.to_user.all():
recipients[user].add('--direct--')
return recipients
def delivered_to(self):
"""Returns the list of user which received the document.
It can differ from ``to()`` if the members of a recipient
mailing-list changed.
"""
return [m.owner for m in self.mailboxes.all() if m.outbox == False]
def timestamp(self, to=None):
if not self._timestamp:
self._timestamp = time.time()
self.date = dt.datetime.fromtimestamp(self._timestamp, utc)
self.save()
django_journal.record(
'timestamp',
'timestamped document {document} result is {timestamp}',
document=self,
timestamp=self._timestamp,
)
return self._timestamp
def post(self, forward=True):
"""Deliver the document into inbox of all active users recipients and into
outbox of the sender.
"""
to_with_origins = self.to_with_origin()
to = [user for user in to_with_origins.keys() if user.is_active]
# Create the timestamp
self.timestamp(to=to)
# Record recipient lists
for mailing_list in self.to_list.all():
django_journal.record(
'delivery',
'deliver document {document} to members of list {mailing_list}',
document=self,
mailing_list=mailing_list,
)
for user in to:
Mailbox.objects.get_or_create(owner=user, document=self)
if '--direct--' not in to_with_origins[user] or len(to_with_origins[user]) > 1:
lists = u', '.join(m.name for m in to_with_origins[user] if m != '--direct--')
if '--direct--' in to_with_origins[user]:
django_journal.record(
'delivery',
'deliver document '
'{document} in mailbox of user {recipient} '
'member of {lists} and as a direct recipient',
document=self,
recipient=user,
lists=lists,
)
else:
django_journal.record(
'delivery',
'deliver document ' '{document} in mailbox of user {recipient} ' 'member of {lists}',
document=self,
recipient=user,
lists=lists,
)
else:
django_journal.record(
'delivery',
'deliver document {document} in mailbox of user {recipient} as a direct recipient',
document=self,
recipient=user,
)
# Deliver to ouput mailbox of the senders
for sender in itertools.chain([self.sender], self.extra_senders.all()):
Mailbox.objects.get_or_create(owner=sender, outbox=True, document=self)
django_journal.record(
'delivery',
'deliver document {document} in output mailbox of user {recipient}',
document=self,
recipient=self.sender,
)
# Push notifications
Notification.objects.notify(document=self, users=to)
if forward:
AutomaticForwarding.try_forwarding(self)
return len(to)
def forward(self, new_sender, lists, users, automatic=False):
"""Forward the document to new recipients.
new_sender - new sender
lists - new mailing lists recipients
users - new users recipients
automatic - whether this forwarding is the result of an
automatic treatment
"""
document = Document.objects.create(sender=new_sender, filetype=self.filetype, comment=self.comment)
document.to_user.set(users)
document.to_list.set(lists)
attached_files = [
AttachedFile(name=attached_file.name, content=attached_file.content, document=document)
for attached_file in self.attached_files.all()
]
AttachedFile.objects.bulk_create(attached_files)
recipients_count = document.post(forward=False)
return (
recipients_count,
DocumentForwarded.objects.create(from_document=self, to_document=document, automatic=automatic),
)
def sender_display(self):
return username(self.sender)
def extra_senders_display(self):
return " ".join([username(sender) for sender in self.extra_senders.all()])
def url(self):
return urlparse.urljoin(
app_settings.BASE_URL, reverse('inbox-message', kwargs=dict(mailbox_id=self.id))
)
def has_zip_download(self):
return self.attached_files.count() > 1 and settings.ZIP_DOWNLOAD
class SeenDocument(Model):
'''Mark a document as seen'''
document = ForeignKey('Document', on_delete=CASCADE)
user = ForeignKey('auth.User', on_delete=CASCADE)
class Meta:
verbose_name = _('seen document')
verbose_name_plural = _('seen documents')
ordering = ('-document',)
class DeletedDocument(Model):
'''Mark a document as deleted'''
document = ForeignKey('Document', on_delete=CASCADE)
user = ForeignKey('auth.User', on_delete=CASCADE)
soft_delete = BooleanField(default=False)
soft_delete_date = DateTimeField(null=True)
class Meta:
verbose_name = _('deleted document')
verbose_name_plural = _('deleted documents')
ordering = ('-document',)
@python_2_unicode_compatible
class DocumentForwarded(Model):
"""Model to store tags applied to models.
First use will be to mark message as having been forwarded.
"""
from_document = ForeignKey(Document, related_name='document_forwarded_to', on_delete=CASCADE)
to_document = ForeignKey(Document, related_name='document_forwarded_from', on_delete=CASCADE)
date = DateTimeField(auto_now_add=True)
automatic = BooleanField(default=False)
def __str__(self):
if self.automatic:
return _(u'forwarded document {from_document} as {to_document} on {date} automatically').format(
from_document=self.from_document, to_document=self.to_document, date=self.date
)
else:
return _(u'forwarded document {from_document} as {to_document} on {date}').format(
from_document=self.from_document, to_document=self.to_document, date=self.date
)
def list_to_csv(l, mapping_func=None):
"""Convert a list to a comma separated string of its unicode values.
A mapping_func function can be passed to transform the list prior to the
formatting.
"""
if mapping_func:
l = map(mapping_func, l)
return u', '.join(map(force_text, l))
@python_2_unicode_compatible
class AutomaticForwarding(Model):
"""
Choice of sender and filetype to transfer mail automatically to a list
of recipients.
"""
filetypes = ManyToManyField(
FileType,
related_name='forwarding_rules',
verbose_name=_('filetype'),
limit_choices_to={'is_active': True},
)
originaly_to_user = ManyToManyField(
User,
related_name='as_original_recipient_forwarding_rules',
blank=True,
verbose_name=_('Original recipients'),
help_text=_('At least one recipient must match for the rule to ' 'apply.'),
)
forward_to_user = ManyToManyField(
User, related_name='as_recipient_forwarding_rules', blank=True, verbose_name=_('Users to forward to')
)
forward_to_list = ManyToManyField(
'MailingList',
blank=True,
verbose_name=_('Groups to forward to'),
related_name='as_recipient_forwarding_rules',
)
class Meta:
verbose_name = _('Automatic forwarding rule')
verbose_name_plural = _('Automatic forwarding rules')
def __str__(self):
'''Return a display string for the forwarding rule.'''
ctx = {
'filetypes': list_to_csv(self.filetypes.all()),
'originaly_to_user': list_to_csv(map(username, self.originaly_to_user.all())),
'to': list_to_csv(
list(map(username, self.forward_to_user.all())) + list(self.forward_to_list.all())
),
}
assert self.filetypes.all() or self.originaly_to_user.all()
if self.filetypes.all() and self.originaly_to_user.all():
tpl = _('Forward documents of type {filetypes} and to {originaly_to_user} automatically to {to}')
elif self.filetypes.all():
tpl = _('Forward documents of type {filetypes} automatically to {to}')
else:
tpl = _('Forward documents to {originaly_to_user} automatically to {to}')
return tpl.format(**ctx)
@classmethod
def try_forwarding(self, document):
'''Try applying matching rules to the document'''
for rule in self.objects.filter(
Q(filetypes=document.filetype, originaly_to_user__in=document.to())
| Q(filetypes=document.filetype, originaly_to_user__isnull=True)
):
recipients_count, document_forwarded = document.forward(
document.sender,
users=rule.forward_to_user.all(),
lists=rule.forward_to_list.all(),
automatic=True,
)
django_journal.record(
'automatic-forward',
'document {document} '
'automatically forwarded to {recipients_count} new recipients '
'as document {new_document}',
document=document_forwarded.from_document,
new_document=document_forwarded.to_document,
recipients_count=recipients_count,
document_forwarded=document_forwarded,
)
@python_2_unicode_compatible
class AttachedFile(Model):
'''Uploaded file attached to a Document'''
name = CharField(max_length=300, verbose_name=_('Name'))
content = FileField(max_length=300, upload_to=generate_filename, verbose_name=_('File'))
document = ForeignKey(
Document, verbose_name=_('Attached to'), on_delete=CASCADE, related_name='attached_files'
)
kind = ForeignKey(
'FileTypeAttachedFileKind',
blank=True,
null=True,
on_delete=CASCADE,
verbose_name=_('attached file kind'),
)
def filename(self):
"""Extract the original filename from generated unique filename for
storage.
It remove the part after the last underscore in the name.
"""
filename = os.path.basename(self.content.name)
try:
prefix, true_filename = filename.split('_', 1)
if prefix.isdigit():
return true_filename
except:
pass
return filename
def ellipsed_name(self):
return ellipsize(self.name)
def link(self):
"""Returns the link for downloading the attached file in the admin
application.
"""
return '/admin/docbow/attachedfile/%s/download/' % self.id
def __str__(self):
return self.name
def is_guest(user):
try:
return user.docbowprofile.is_guest
except DocbowProfile.DoesNotExist:
return False
@python_2_unicode_compatible
class Delegation(Model):
"""
Delegate account, managable by user themselves.
"""
by = ForeignKey(
User,
related_name='delegations_to',
on_delete=CASCADE,
verbose_name=pgettext_lazy('delegation from', "From"),
)
to = ForeignKey(
User,
related_name='delegations_by',
on_delete=CASCADE,
verbose_name=pgettext_lazy('delegation to', "To"),
)
class Meta:
ordering = ['by']
verbose_name = _('Account delegation')
verbose_name_plural = _('Account delegations')
db_table = 'auth_delegation'
unique_together = (('by', 'to'),)
def __str__(self):
return u'delegation from {0}:{0.id} to {1}:{1.id}'.format(self.by, self.to)
@property
def guest_delegate(self):
try:
return self.to.docbowprofile.is_guest
except:
return False
class MailingListManager(GetByNameManager):
def active(self):
return self.filter(is_active=True)
def is_member_of(self, user):
return self.are_member_of([user])
def are_member_of(self, users):
lists = set(MailingList.objects.filter(is_active=True).filter(members__in=users))
while True: # accumulate lists until it grows no more
old_count = len(lists)
lists |= set(MailingList.objects.filter(is_active=True).filter(mailing_list_members__in=lists))
if old_count == len(lists):
break
return lists
@python_2_unicode_compatible
class MailingList(NameNaturalKey, Model):
'''A list of recipients.'''
name = CharField(max_length=400, verbose_name=_('Name'))
members = ManyToManyField(User, verbose_name=_('Members'), blank=True, related_name='mailing_lists')
mailing_list_members = ManyToManyField(
'MailingList', verbose_name=_('Mailing lists members'), blank=True, related_name='members_lists'
)
is_active = BooleanField(verbose_name=_('is active'), blank=True, default=True)
objects = MailingListManager()
class Meta:
ordering = ['name']
verbose_name = _('Mailing list')
verbose_name_plural = _('Mailing lists')
def recursive_members(self, sublist_traversed=None):
"""Traverse this list and all its recursive sublist and accumulate
members."""
if sublist_traversed is None:
sublist_traversed = set()
members = set()
members |= set(self.members.filter(is_active=True))
for sublist in self.mailing_list_members.all():
if sublist not in sublist_traversed:
sublist_traversed.add(sublist)
members |= sublist.recursive_members(sublist_traversed)
return members
def recursive_members_with_origin(self, sublist_traversed=None):
"""Traverse this list and all its recursive sublist and accumulate
members and their origin."""
if sublist_traversed is None:
sublist_traversed = defaultdict(lambda: 0)
sublist_traversed[self] += 1
members = defaultdict(lambda: set())
for member in self.members.filter(is_active=True):
members[member].add(self)
for sublist in self.mailing_list_members.all():
if sublist_traversed[sublist] < 3:
sub_members = sublist.recursive_members_with_origin(sublist_traversed).items()
for member, list_set in sub_members:
members[member] |= list_set
members[member].add(self)
return members
def __str__(self):
return self.name
@python_2_unicode_compatible
class Mailbox(Model):
'''List of document received by a user'''
owner = ForeignKey(User, verbose_name=_('Mailbox owner'), on_delete=CASCADE, related_name='documents')
document = ForeignKey(Document, verbose_name=('Document'), on_delete=CASCADE, related_name='mailboxes')
outbox = BooleanField(verbose_name=_('Outbox message'), blank=True, default=False, db_index=True)
date = DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-date']
verbose_name = _('Mailbox')
verbose_name_plural = _('Mailboxes')
def __str__(self):
return _(u'mailbox entry {id} of user {user}:{user.id} created on ' u'{date} for {document}').format(
id=self.id, user=self.owner, date=self.date, document=self.document
)
class DocbowUser(User):
class Meta:
verbose_name = _('Docbow admin user')
proxy = True
app_label = 'docbow'
class DocbowGroup(Group):
class Meta:
verbose_name = _('Docbow admin group')
app_label = 'docbow'
proxy = True
class Inbox(Mailbox):
class Meta:
proxy = True
verbose_name = _('Inbox')
verbose_name_plural = _('Inboxes')
class Outbox(Mailbox):
class Meta:
proxy = True
verbose_name = _('Outbox')
verbose_name_plural = _('Outboxes')
@python_2_unicode_compatible
class SendingLimitation(Model):
mailing_list = OneToOneField(
MailingList, unique=True, on_delete=CASCADE, verbose_name=MailingList._meta.verbose_name
)
filetypes = ManyToManyField(
FileType,
blank=True,
related_name='filetype_limitation',
verbose_name=_('Limitation des types de fichier'),
limit_choices_to={'is_active': True},
)
lists = ManyToManyField(
MailingList, related_name='lists_limitation', verbose_name=_('Limitation des destinataires')
)
class Meta:
verbose_name = _('Limitation par liste de destinataires')
verbose_name = _('Limitation par liste de destinataires')
verbose_name_plural = verbose_name
def __str__(self):
return _(
u'sending limitation for list {mailing_list} to filetypes' u'{filetypes} and lists {lists}'
).format(
mailing_list=self.mailing_list,
filetypes=list_to_csv(self.filetypes.all()),
lists=list_to_csv(self.lists.all()),
)
@python_2_unicode_compatible
class DocbowProfile(Model):
'''Hold extra user attributes'''
user = OneToOneField(User, unique=True, on_delete=CASCADE)
is_guest = BooleanField(verbose_name=_('Guest user'), blank=True, default=False)
mobile_phone = CharField(
max_length=32, verbose_name=_('Mobile phone'), blank=True, validators=[validate_phone]
)
personal_email = EmailField(
_('personal email address'),
blank=True,
help_text=_(
'if you provide a personal email address, notifications '
'of new documents will also be sent to this address.'
),
)
accept_notifications = BooleanField(
verbose_name=_('Accept to be notified'),
blank=True,
default=True,
help_text=_("If unchecked you will not received notifications " "anymore, by email or SMS."),
)
external_id = CharField(max_length=256, verbose_name=_('External identifer'), blank=True)
def __str__(self):
return _(
u'docbow profile of {user}:{user.id} with mobile phone '
u'{mobile_phone} and personal email {personal_email}'
).format(user=self.user, mobile_phone=self.mobile_phone, personal_email=self.personal_email)
class NotificationManager(Manager):
def notify(self, document=None, users=None, kind='new-document', ctx=None):
'''Build notifications in bulk'''
users = set(users)
if document and not document.private:
users |= set(User.objects.filter(delegations_by__by__in=users, is_active=True))
users = User.objects.filter(pk__in=[u.pk for u in users]).exclude(
docbowprofile__accept_notifications=False
)
notifications = [Notification(user=user, document=document, kind=kind, ctx=ctx) for user in users]
self.bulk_create(notifications)
@python_2_unicode_compatible
class Notification(Model):
"""Asynchronous notification
create_dt - date and time of creation
document - the related document
user - the recipient of the notification
kind - the kind of notification
done - notification has been done successfully or not
failure - report of the last failure, if done is True, the notification
was successful if this is empty.
ctx - a pickled object
"""
objects = NotificationManager()
create_dt = DateTimeField(auto_now_add=True)
document = ForeignKey(Document, blank=True, null=True, on_delete=CASCADE)
user = ForeignKey(User, blank=True, null=True, on_delete=CASCADE)
kind = CharField(max_length=32, default='new-document')
done = BooleanField(blank=True, default=False)
failure = TextField(blank=True, null=True)
ctx = PickledObjectField(blank=True, null=True)
class Meta:
ordering = ('-id',)
def __str__(self):
return _(u'notification {0}:{1}').format(self.kind, self.id)
class NotificationPreference(Model):
'''Store preferences of users toward notification methods'''
user = ForeignKey(User, verbose_name=_('user'), on_delete=CASCADE)
filetype = ForeignKey(FileType, verbose_name=_('file type'), on_delete=CASCADE)
kind = CharField(max_length=8, verbose_name=_('kind'))
value = BooleanField(default=True, verbose_name=_('value'))
class Meta:
ordering = ('user__last_name', 'user__first_name', 'kind')
verbose_name = _('notification preference')
verbose_name_plural = _('notification preferences')
def guest_users():
return User.objects.filter(docbowprofile__is_guest=True).filter(is_active=True)
def non_guest_users():
return User.objects.exclude(docbowprofile__is_guest=True).filter(is_active=True)
import watson.search as watson
class DocumentAdapter(watson.SearchAdapter):
def gather_strings(self, obj):
yield obj.comment
for attached_file in obj.attached_files.all():
yield attached_file.name
def get_title(self, obj):
return u' '.join(self.gather_strings(obj))[:1000]
def get_description(self, obj):
return u' '.join(self.gather_strings(obj))
def get_content(self, obj):
return u''
watson.register(Document, DocumentAdapter)