204 lines
6.8 KiB
Python
204 lines
6.8 KiB
Python
# combo - content management system
|
|
# Copyright (C) 2014-2016 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import re
|
|
|
|
from django.conf import settings
|
|
from django.db import models
|
|
from django.db.models import Q
|
|
from django.db.models.query import QuerySet
|
|
from django.utils.encoding import force_str
|
|
from django.utils.timezone import now, timedelta
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from combo.apps.pwa.models import PwaSettings
|
|
from combo.data.library import register_cell_class
|
|
from combo.data.models import CellBase
|
|
|
|
|
|
class NotificationQuerySet(QuerySet):
|
|
def namespace(self, namespace):
|
|
return self.filter(external_id__startswith='%s:' % namespace)
|
|
|
|
def find(self, user, id):
|
|
qs = self.filter(user=user)
|
|
try:
|
|
int(id)
|
|
except (ValueError, TypeError):
|
|
search_id = Q(external_id=id)
|
|
else:
|
|
search_id = Q(pk=id) | Q(external_id=id)
|
|
return qs.filter(search_id)
|
|
|
|
def ack(self):
|
|
self.update(acked=True)
|
|
|
|
def visible(self, user=None, n=None):
|
|
qs = self
|
|
if user:
|
|
qs = qs.filter(user=user)
|
|
n = n or now()
|
|
qs = qs.filter(Q(start_timestamp__lte=n) & (Q(end_timestamp__isnull=True) | Q(end_timestamp__gte=n)))
|
|
return qs.order_by('-start_timestamp')
|
|
|
|
def new(self):
|
|
return self.filter(acked=False)
|
|
|
|
def forget(self):
|
|
past_end_timestamp = now() - timedelta(seconds=5)
|
|
self.update(end_timestamp=past_end_timestamp, acked=True)
|
|
|
|
|
|
class Notification(models.Model):
|
|
ID_RE = r'^[\w-]+:[\w-]+$'
|
|
|
|
objects = NotificationQuerySet.as_manager()
|
|
|
|
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
|
|
summary = models.CharField(_('Label'), max_length=140)
|
|
body = models.TextField(_('Body'), default='', blank=True)
|
|
url = models.URLField(_('URL'), default='', blank=True)
|
|
origin = models.CharField(_('Origin'), max_length=100, blank=True)
|
|
start_timestamp = models.DateTimeField(_('Start date and time'))
|
|
end_timestamp = models.DateTimeField(_('End date and time'), null=True)
|
|
acked = models.BooleanField(_('Acked'), default=False)
|
|
external_id = models.SlugField(_('External identifier'), null=True)
|
|
|
|
class Meta:
|
|
verbose_name = _('Notification')
|
|
unique_together = (('user', 'external_id'),)
|
|
|
|
def __str__(self):
|
|
return str(self.summary)
|
|
|
|
@property
|
|
def public_id(self):
|
|
return self.external_id or str(self.pk)
|
|
|
|
@classmethod
|
|
def notify(
|
|
cls,
|
|
user,
|
|
summary,
|
|
id=None,
|
|
body='',
|
|
url='',
|
|
origin='',
|
|
start_timestamp=None,
|
|
duration=None,
|
|
end_timestamp=None,
|
|
acked=None,
|
|
):
|
|
"""
|
|
Create a new notification:
|
|
Notification.notify(user, 'summary') -> id
|
|
Create a notification with a duration of one day:
|
|
Notification.notify(user, 'summary', duration=3600*24)
|
|
Renew an existing notification, or create a new one, with an external_id:
|
|
Notification.notify(user, 'summary', id='id')
|
|
"""
|
|
start_timestamp = start_timestamp or now()
|
|
|
|
if end_timestamp:
|
|
pass
|
|
elif duration == 0:
|
|
end_timestamp = None
|
|
elif duration:
|
|
if not isinstance(duration, timedelta):
|
|
duration = timedelta(seconds=duration)
|
|
end_timestamp = start_timestamp + duration
|
|
else:
|
|
end_timestamp = start_timestamp + timedelta(days=settings.COMBO_DEFAULT_NOTIFICATION_DURATION)
|
|
|
|
defaults = {
|
|
'summary': summary,
|
|
'body': body,
|
|
'url': url,
|
|
'origin': origin,
|
|
'start_timestamp': start_timestamp,
|
|
'end_timestamp': end_timestamp,
|
|
}
|
|
if acked is not None:
|
|
defaults['acked'] = acked
|
|
|
|
try:
|
|
pk = int(id)
|
|
# id is maybe an implicit id
|
|
notification = Notification.objects.get(pk=pk)
|
|
Notification.objects.filter(pk=pk).update(**defaults)
|
|
return notification
|
|
except (ValueError, TypeError, Notification.DoesNotExist):
|
|
pass
|
|
|
|
if id:
|
|
try:
|
|
id = force_str(id)
|
|
except Exception as e:
|
|
raise ValueError('id must be convertible to unicode', e)
|
|
if not re.match(cls.ID_RE, id):
|
|
raise ValueError('id must match regular expression %s' % cls.ID_RE)
|
|
notification, dummy = Notification.objects.update_or_create(
|
|
user=user, external_id=id, defaults=defaults
|
|
)
|
|
else:
|
|
notification = Notification.objects.create(user=user, **defaults)
|
|
return notification
|
|
|
|
def forget(self):
|
|
self.end_timestamp = now() - timedelta(seconds=5)
|
|
self.acked = True
|
|
self.save(update_fields=['end_timestamp', 'acked'])
|
|
|
|
def ack(self):
|
|
self.acked = True
|
|
self.save(update_fields=['acked'])
|
|
|
|
|
|
@register_cell_class
|
|
class NotificationsCell(CellBase):
|
|
user_dependant = True
|
|
ajax_refresh = 120
|
|
loading_message = _('Loading notifications...')
|
|
|
|
class Meta:
|
|
verbose_name = _('User Notifications')
|
|
|
|
def is_visible(self, request, **kwargs):
|
|
user = getattr(request, 'user', None)
|
|
if user is None or not user.is_authenticated:
|
|
return False
|
|
return super().is_visible(request, **kwargs)
|
|
|
|
def get_cell_extra_context(self, context):
|
|
extra_context = super().get_cell_extra_context(context)
|
|
user = getattr(context.get('request'), 'user', None)
|
|
if user and user.is_authenticated:
|
|
qs = Notification.objects.visible(user)
|
|
extra_context['notifications'] = qs
|
|
extra_context['new_notifications'] = qs.new()
|
|
pwa_settings = PwaSettings.singleton()
|
|
extra_context['push_notifications_enabled'] = pwa_settings.push_notifications
|
|
return extra_context
|
|
|
|
def get_badge(self, context):
|
|
user = getattr(context.get('request'), 'user', None)
|
|
if not user or not user.is_authenticated:
|
|
return
|
|
new_count = Notification.objects.visible(user).new().count()
|
|
if not new_count:
|
|
return
|
|
return {'badge': str(new_count)}
|