This repository has been archived on 2023-02-21. You can view files and clone it, but cannot push or open issues or pull requests.
corbo/corbo/models.py

216 lines
8.1 KiB
Python

import os
import io
import hashlib
from time import mktime
from datetime import datetime
from lxml import etree
import requests
import feedparser
from django.utils import timezone
from django.utils.encoding import force_text
from django.conf import settings
from django.db import models
from django.core.files.storage import DefaultStorage
from django.utils.translation import ugettext_lazy as _
from ckeditor.fields import RichTextField
from . import utils
channel_choices = (
('mailto', _('Email')),
('sms', _('SMS')),
)
class Category(models.Model):
name = models.CharField(_('Name'), max_length=64, blank=False, null=False)
slug = models.SlugField(_('Slug'), unique=True)
rss_feed_url = models.URLField(_('Feed URL'), blank=True, null=True,
help_text=_('if defined, announces will be automatically created from rss items'))
ctime = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['name']
def __unicode__(self):
return self.name
def get_announces_count(self):
return self.announce_set.all().count()
def get_subscriptions_count(self):
return self.subscription_set.all().count()
def save(self, *args, **kwargs):
super(Category, self).save(*args, **kwargs)
if not self.rss_feed_url:
return
feed_response = requests.get(self.rss_feed_url, proxies=settings.REQUESTS_PROXIES)
if feed_response.ok:
content = feedparser.parse(feed_response.content)
for entry in content.get('entries', []):
published = datetime.fromtimestamp(mktime(entry.published_parsed))
announce, created = Announce.objects.get_or_create(identifier=entry['id'],
category=self)
announce.title = entry['title']
announce.text = entry['summary']
announce.publication_time = published
announce.save()
if created:
Broadcast.objects.get_or_create(announce=announce)
class Announce(models.Model):
category = models.ForeignKey('Category', verbose_name=_('category'))
title = models.CharField(_('title'), max_length=256,
help_text=_('maximum 256 characters'))
identifier = models.CharField(max_length=256, null=True, blank=True)
text = RichTextField(_('Content'))
publication_time = models.DateTimeField(_('Publication date'), blank=True,
null=True)
expiration_time = models.DateTimeField(_('Expiration date'), blank=True,
null=True)
ctime = models.DateTimeField(_('creation time'), auto_now_add=True)
mtime = models.DateTimeField(_('modification time'), auto_now=True)
def save(self, *args, **kwargs):
if self.text:
html_tree = etree.HTML(self.text)
storage = DefaultStorage()
file_counter = 1
for img in html_tree.xpath('//img'):
if img.attrib['src'].startswith('/'):
continue
image_name = os.path.basename(img.attrib['src'])
r = requests.get(img.attrib['src'], proxies=settings.REQUESTS_PROXIES)
if not r.ok:
continue
new_content = r.content
# get announce images list
dirs, files = storage.listdir(self.images_path)
existing_file = None
# compute next filename
files.sort()
for f in files:
if image_name == f.split('_', 1)[-1]:
existing_file = f
try:
file_counter = int(f.split('_', 1)[0])
file_counter += 1
except ValueError:
file_counter = 1
if existing_file:
existing_file_path = os.path.join(self.images_path, existing_file)
old_content = storage.open(existing_file_path).read()
old_hash = hashlib.md5(old_content).hexdigest()
new_hash = hashlib.md5(new_content).hexdigest()
img.attrib['src'] = storage.url(existing_file_path)
if new_hash == old_hash:
continue
file_counter = str(file_counter).zfill(2)
image_name = '%s_%s' % (file_counter, image_name)
image_name = os.path.join(self.images_path, image_name)
storage.save(image_name, io.BytesIO(new_content))
img.attrib['src'] = storage.url(image_name)
self.text = force_text(etree.tostring(html_tree))
super(Announce, self).save(*args, **kwargs)
@property
def images_path(self):
path = os.path.join('images', str(self.id))
storage = DefaultStorage()
if not storage.exists(path):
os.makedirs(storage.path(path))
return path
def __unicode__(self):
return u'{title} ({id}) at {mtime}'.format(
title=self.title, id=self.id, mtime=self.mtime)
def is_expired(self):
if self.expiration_time:
return self.expiration_time < timezone.now()
return False
def is_published(self):
if self.publication_time:
return self.publication_time <= timezone.now()
return False
class Meta:
verbose_name = _('announce')
ordering = ('-mtime',)
class Broadcast(models.Model):
announce = models.ForeignKey(Announce, verbose_name=_('announce'))
deliver_time = models.DateTimeField(_('Deliver time'), null=True)
delivery_count = models.IntegerField(_('Delivery count'), default=0)
def __unicode__(self):
if self.deliver_time:
return u'announce {id} delivered at {time}'.format(
id=self.announce.id, time=self.deliver_time)
return u'announce {id} to deliver'.format(id=self.announce.id)
def filter_destinations(self, destinations, prefix):
return [dest for dest in destinations if dest.startswith('%s:' % prefix)]
def send_sms(self, title, content, destinations, category_id):
return utils.send_sms(content, destinations)
def send_mailto(self, title, content, destinations, category_id):
return utils.send_email(title, content, destinations, category_id)
def send(self):
total_sent = 0
destinations = [s.identifier for s in self.announce.category.subscription_set.all() if s.identifier]
for channel_name, verbose_name in channel_choices:
action = getattr(self, 'send_' + channel_name)
filtered_destinations = self.filter_destinations(destinations, channel_name)
total_sent += action(self.announce.title, self.announce.text,
filtered_destinations, self.announce.category.id)
self.delivery_count = total_sent
self.deliver_time = timezone.now()
self.save()
class Meta:
verbose_name = _('sent')
ordering = ('-deliver_time',)
class Subscription(models.Model):
category = models.ForeignKey('Category', verbose_name=_('Category'))
uuid = models.CharField(_('User identifier'), max_length=128, blank=True)
identifier = models.CharField(_('identifier'), max_length=128, blank=True,
help_text=_('ex.: mailto, ...'))
def __unicode__(self):
return '%s - %s - %s' % (self.uuid, self.identifier, self.category.name)
def get_identifier_display(self):
try:
scheme, identifier = self.identifier.split(':')
return identifier
except ValueError:
return self.identifier
class Meta:
unique_together = ('category', 'identifier', 'uuid')
def clean(self):
if 'sms:' in self.identifier:
uri, phonenumber = self.identifier.split(':', 1)
self.identifier = '%s:%s' % (uri, utils.format_phonenumber(phonenumber))
def save(self, *args, **kwargs):
self.clean()
return super(Subscription, self).save(*args, **kwargs)