# combo - content management system # Copyright (C) 2014-2016 Entr'ouvert # # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import re from django.conf import settings from django.db import models from django.utils.encoding import force_text, python_2_unicode_compatible from django.utils.translation import ugettext_lazy as _ from django.utils.timezone import now, timedelta from django.db.models import Q from django.db.models.query import QuerySet from combo.data.models import CellBase from combo.data.library import register_cell_class from combo.apps.pwa.models import PwaSettings class NotificationQuerySet(QuerySet): def namespace(self, namespace): return self.filter(external_id__startswith='%s:' % namespace) def find(self, user, id): qs = self.filter(user=user) try: int(id) except (ValueError, TypeError): search_id = Q(external_id=id) else: search_id = Q(pk=id) | Q(external_id=id) return qs.filter(search_id) def ack(self): self.update(acked=True) def visible(self, user=None, n=None): qs = self if user: qs = qs.filter(user=user) n = n or now() qs = qs.filter(Q(start_timestamp__lte=n) & (Q(end_timestamp__isnull=True) | Q(end_timestamp__gte=n))) return qs.order_by('-start_timestamp') def new(self): return self.filter(acked=False) def forget(self): past_end_timestamp = now() - timedelta(seconds=5) self.update( end_timestamp=past_end_timestamp, acked=True) @python_2_unicode_compatible class Notification(models.Model): ID_RE = r'^[\w-]+:[\w-]+$' objects = NotificationQuerySet.as_manager() user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE) summary = models.CharField(_('Label'), max_length=140) body = models.TextField(_('Body'), default='', blank=True) url = models.URLField(_('URL'), default='', blank=True) origin = models.CharField(_('Origin'), max_length=100, blank=True) start_timestamp = models.DateTimeField(_('Start date and time')) end_timestamp = models.DateTimeField(_('End date and time'), null=True) acked = models.BooleanField(_('Acked'), default=False) external_id = models.SlugField(_('External identifier'), null=True) class Meta: verbose_name = _('Notification') unique_together = ( ('user', 'external_id'), ) def __str__(self): return self.summary @property def public_id(self): return self.external_id or str(self.pk) @classmethod def notify(cls, user, summary, id=None, body='', url='', origin='', start_timestamp=None, duration=None, end_timestamp=None, acked=None): ''' Create a new notification: Notification.notify(user, 'summary') -> id Create a notification with a duration of one day: Notification.notify(user, 'summary', duration=3600*24) Renew an existing notification, or create a new one, with an external_id: Notification.notify(user, 'summary', id='id') ''' start_timestamp = start_timestamp or now() if end_timestamp: pass elif duration == 0: end_timestamp = None elif duration: if not isinstance(duration, timedelta): duration = timedelta(seconds=duration) end_timestamp = start_timestamp + duration else: end_timestamp = start_timestamp + timedelta(days=settings.COMBO_DEFAULT_NOTIFICATION_DURATION) defaults = { 'summary': summary, 'body': body, 'url': url, 'origin': origin, 'start_timestamp': start_timestamp, 'end_timestamp': end_timestamp, } if acked is not None: defaults['acked'] = acked try: pk = int(id) # id is maybe an implicit id notification = Notification.objects.get(pk=pk) Notification.objects.filter(pk=pk).update(**defaults) return notification except (ValueError, TypeError, Notification.DoesNotExist): pass if id: try: id = force_text(id) except Exception as e: raise ValueError('id must be convertible to unicode', e) if not re.match(cls.ID_RE, id): raise ValueError('id must match regular expression %s' % cls.ID_RE) notification, created = Notification.objects.update_or_create( user=user, external_id=id, defaults=defaults) else: notification = Notification.objects.create(user=user, **defaults) return notification def forget(self): self.end_timestamp = now() - timedelta(seconds=5) self.acked = True self.save(update_fields=['end_timestamp', 'acked']) def ack(self): self.acked = True self.save(update_fields=['acked']) @register_cell_class class NotificationsCell(CellBase): user_dependant = True ajax_refresh = 120 loading_message = _('Loading notifications...') class Meta: verbose_name = _('User Notifications') def is_visible(self, user=None): if user is None or not user.is_authenticated: return False return super(NotificationsCell, self).is_visible(user) def get_cell_extra_context(self, context): extra_context = super(NotificationsCell, self).get_cell_extra_context(context) user = getattr(context.get('request'), 'user', None) if user and user.is_authenticated: qs = Notification.objects.visible(user) extra_context['notifications'] = qs extra_context['new_notifications'] = qs.new() pwa_settings = PwaSettings.singleton() extra_context['push_notifications_enabled'] = pwa_settings.push_notifications return extra_context def get_badge(self, context): user = getattr(context.get('request'), 'user', None) if not user or not user.is_authenticated: return new_count = Notification.objects.visible(user).new().count() if not new_count: return return {'badge': str(new_count)}